Add type check samples (#955)

* add core types and type checking function

* fix unit test bug

* avoid defining dynamic classes

* typo fix

* add component metadata format

* add a construct for the component decorator

* add default values for the meta classes

* add input/output types to the metadata

* add from_dict in TypeMeta

* small fix

* add unit tests

* use python struct for the openapi schema

* add default in parameter

* add default value

* remove the str restriction for the param default

* bug fix

* add pipelinemeta

* add pipeline metadata

* ignore annotation if it is not str/BaseType/dict

* update param name in the check_type functions
remove schema validators for GCRPath, and adjust for GCRPath, GCSPath
change _check_valid_dict to _check_valid_type_dict to avoid confusion
fix typo in the comments
adjust function order for readability

* remove default values for non-primitive types in the function signature
update the _check_valid_type_dict name

* pass metadata from component decorator and task factory to containerOp

* pass pipeline metadata to Pipeline

* fix unit test

* typo in the comments

* move the metadata classes to a separate module

* fix unit test

* small change

* add __eq__ to meta classes
not export _metadata classes

* nothing

* fix unit test

* unit test python component

* unit test python pipeline

* fix bug: duplicate variable of args

* fix unit tests

* move python_component and _component decorator in _component file

* remove the print

* change parameter default value to None

* add functools wraps around _component decorator

* TypeMeta accept both str and dict

* fix indent, add unit test for type as strings

* do not set default value for the name field in ParameterMeta, ComponentMeta, and PipelineMeta

* add type check in task factory

* output error message

* add type check in component decorator; move the metadata assignment out of the containerop __init__ function

* fix bug; add unit test

* add more unit tests

* more unit tests; fix bugs

* more unit tests; fix bugs

* add unit tests

* more unit tests

* add type check switch; add unit tests

* add compiler option for type check

* add a notebook sample

* resolving pr comments

* add unit test for pipeline param check with component types; fix the bug; also fix the bug when there are not a single return annotations

* add dsl static type checking sample

* fix bug: op_to_template resolve the raw arguments by mapping to the argument_inputs but the argument_inputs lost the type information

* fix type pattern matching

* convert orderedDict to dict from the component module

* add unit test to the pipelineparam with types

* create TypeMeta deserialize function, add comments

* strongly typed pipelineparamtuple

* remove GCSPath fields to avoid artifact type confusion
change the type json schema field name to openAPIV3Schema

* fix unit tests; add unit test for openapishema property

* add comments

* add unit test at the component module; fix bug

* add ignore_type in pipelineparam

* update sample: no artifact types but only parameter types; add pipelineparam ignore_type example

* configure the default type checking to enabled

* change openAPIV3Schema to lower case with underscore

* revert change from the merge

* add code blocks, add the benefits of static type checking
add more comments within the code block
add documentation about the type definition in both yaml and decorated
components.

* fix the comment

* update dsl.type namespace
This commit is contained in:
Ning 2019-03-27 19:58:42 -07:00 committed by Kubernetes Prow Robot
parent 2c8170ceae
commit f59c25bd04
4 changed files with 809 additions and 6 deletions

View File

@ -0,0 +1,803 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# KubeFlow Pipeline DSL Static Type Checking\n",
"\n",
"In this notebook, we will demo: \n",
"\n",
"* Defining a KubeFlow pipeline with Python DSL\n",
"* Compile the pipeline with type checking\n",
"\n",
"Static type checking helps users to identify component I/O inconsistencies without running the pipeline. It also shortens the development cycles by catching the errors early. This feature is especially useful in two cases: 1) when the pipeline is huge and manually checking the types is infeasible; 2) when some components are shared ones and the type information is not immediately avaiable to the pipeline authors.\n",
"\n",
"Since this sample focuses on the DSL type checking, we will use components that are not runnable in the system but with various type checking scenarios. \n",
"\n",
"## Component definition\n",
"Components can be defined in either YAML or functions decorated by dsl.component.\n",
"\n",
"## Type definition\n",
"Types can be defined as string or a dictionary with the openapi_schema_validator property formatted as:\n",
"```yaml\n",
"{\n",
" type_name: {\n",
" openapi_schema_validator: {\n",
" }\n",
" }\n",
"}\n",
"```\n",
"For example, the following yaml declares a GCSPath type with the openapi_schema_validator for output field_m.\n",
"The type could also be a plain string, such as the GcsUri. The type name could be either one of the core types or customized ones.\n",
"```yaml\n",
"name: component a\n",
"description: component a desc\n",
"inputs:\n",
" - {name: field_l, type: Integer}\n",
"outputs:\n",
" - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
" - {name: field_n, type: customized_type}\n",
" - {name: field_o, type: GcsUri} \n",
"implementation:\n",
" container:\n",
" image: gcr.io/ml-pipeline/component-a\n",
" command: [python3, /pipelines/component/src/train.py]\n",
" args: [\n",
" --field-l, {inputValue: field_l},\n",
" ]\n",
" fileOutputs: \n",
" field_m: /schema.txt\n",
" field_n: /feature.txt\n",
" field_o: /output.txt\n",
"```\n",
"\n",
"If you define the component using the function decorator, there are a list of [core types](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/_types.py).\n",
"For example, the following component declares a core type Integer for input field_l while\n",
"declares customized_type for its output field_n.\n",
"\n",
"```python\n",
"@component\n",
"def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
" 'field_n': 'customized_type',\n",
" 'field_o': 'Integer'\n",
" }:\n",
" return ContainerOp(\n",
" name = 'operator a',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" arguments = [\n",
" '--field-l', field_l,\n",
" ],\n",
" file_outputs = {\n",
" 'field_m': '/schema.txt',\n",
" 'field_n': '/feature.txt',\n",
" 'field_o': '/output.txt'\n",
" }\n",
" )\n",
"```\n",
"\n",
"## Type check switch\n",
"Type checking is enabled by default. It can be disabled as --disable-type-check argument if dsl-compile is run in the command line, or `dsl.compiler.Compiler().compile(type_check=False)`.\n",
"\n",
"If one wants to ignore the type for one parameter, call ignore_type() function in [PipelineParam](https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/_pipeline_param.py).\n",
"\n",
"## How does type checking work?\n",
"DSL compiler checks the type consistencies among components by checking the type_name as well as the openapi_schema_validator. Some special cases are listed here:\n",
"1. Type checking succeed: If the upstream/downstream components lack the type information.\n",
"2. Type checking succeed: If the type check is disabled.\n",
"3. Type checking succeed: If the parameter type is ignored."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Setup"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {
"tags": [
"parameters"
]
},
"outputs": [],
"source": [
"# Configure the KFP_PACKAGE\n",
"KFP_PACKAGE = 'https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz'"
]
},
{
"cell_type": "markdown",
"metadata": {
"scrolled": false
},
"source": [
"## Install Pipeline SDK"
]
},
{
"cell_type": "code",
"execution_count": 2,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"Collecting https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz\n",
" Using cached https://storage.googleapis.com/ml-pipeline/release/0.1.12/kfp-experiment.tar.gz\n",
"Requirement already satisfied, skipping upgrade: urllib3>=1.15 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.22)\n",
"Requirement already satisfied, skipping upgrade: six>=1.10 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.11.0)\n",
"Requirement already satisfied, skipping upgrade: certifi in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2018.11.29)\n",
"Requirement already satisfied, skipping upgrade: python-dateutil in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.7.5)\n",
"Requirement already satisfied, skipping upgrade: PyYAML in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (3.13)\n",
"Requirement already satisfied, skipping upgrade: google-cloud-storage==1.13.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.13.0)\n",
"Requirement already satisfied, skipping upgrade: kubernetes==8.0.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (8.0.0)\n",
"Requirement already satisfied, skipping upgrade: PyJWT==1.6.4 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.4)\n",
"Requirement already satisfied, skipping upgrade: cryptography==2.4.2 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (2.4.2)\n",
"Requirement already satisfied, skipping upgrade: google-auth==1.6.1 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (1.6.1)\n",
"Requirement already satisfied, skipping upgrade: requests_toolbelt==0.8.0 in /opt/conda/lib/python3.6/site-packages (from kfp==0.1) (0.8.0)\n",
"Requirement already satisfied, skipping upgrade: google-resumable-media>=0.3.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.3.1)\n",
"Requirement already satisfied, skipping upgrade: google-cloud-core<0.29dev,>=0.28.0 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (0.28.1)\n",
"Requirement already satisfied, skipping upgrade: google-api-core<2.0.0dev,>=0.1.1 in /opt/conda/lib/python3.6/site-packages (from google-cloud-storage==1.13.0->kfp==0.1) (1.6.0)\n",
"Requirement already satisfied, skipping upgrade: adal>=1.0.2 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.2.1)\n",
"Requirement already satisfied, skipping upgrade: websocket-client!=0.40.0,!=0.41.*,!=0.42.*,>=0.32.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (0.54.0)\n",
"Requirement already satisfied, skipping upgrade: requests-oauthlib in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (1.0.0)\n",
"Requirement already satisfied, skipping upgrade: setuptools>=21.0.0 in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (38.4.0)\n",
"Requirement already satisfied, skipping upgrade: requests in /opt/conda/lib/python3.6/site-packages (from kubernetes==8.0.0->kfp==0.1) (2.18.4)\n",
"Requirement already satisfied, skipping upgrade: asn1crypto>=0.21.0 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (0.24.0)\n",
"Requirement already satisfied, skipping upgrade: idna>=2.1 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (2.6)\n",
"Requirement already satisfied, skipping upgrade: cffi!=1.11.3,>=1.7 in /opt/conda/lib/python3.6/site-packages (from cryptography==2.4.2->kfp==0.1) (1.11.4)\n",
"Requirement already satisfied, skipping upgrade: pyasn1-modules>=0.2.1 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (0.2.2)\n",
"Requirement already satisfied, skipping upgrade: rsa>=3.1.4 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (4.0)\n",
"Requirement already satisfied, skipping upgrade: cachetools>=2.0.0 in /opt/conda/lib/python3.6/site-packages (from google-auth==1.6.1->kfp==0.1) (3.0.0)\n",
"Requirement already satisfied, skipping upgrade: pytz in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (2018.7)\n",
"Requirement already satisfied, skipping upgrade: googleapis-common-protos!=1.5.4,<2.0dev,>=1.5.3 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (1.5.5)\n",
"Requirement already satisfied, skipping upgrade: protobuf>=3.4.0 in /opt/conda/lib/python3.6/site-packages (from google-api-core<2.0.0dev,>=0.1.1->google-cloud-storage==1.13.0->kfp==0.1) (3.6.1)\n",
"Requirement already satisfied, skipping upgrade: oauthlib>=0.6.2 in /opt/conda/lib/python3.6/site-packages (from requests-oauthlib->kubernetes==8.0.0->kfp==0.1) (2.1.0)\n",
"Requirement already satisfied, skipping upgrade: chardet<3.1.0,>=3.0.2 in /opt/conda/lib/python3.6/site-packages (from requests->kubernetes==8.0.0->kfp==0.1) (3.0.4)\n",
"Requirement already satisfied, skipping upgrade: pycparser in /opt/conda/lib/python3.6/site-packages (from cffi!=1.11.3,>=1.7->cryptography==2.4.2->kfp==0.1) (2.18)\n",
"Requirement already satisfied, skipping upgrade: pyasn1<0.5.0,>=0.4.1 in /opt/conda/lib/python3.6/site-packages (from pyasn1-modules>=0.2.1->google-auth==1.6.1->kfp==0.1) (0.4.4)\n",
"Building wheels for collected packages: kfp\n",
" Running setup.py bdist_wheel for kfp ... \u001b[?25ldone\n",
"\u001b[?25h Stored in directory: /home/jovyan/.cache/pip/wheels/06/14/fc/dd58bcc821d8067efa74a9e217db214d8a075c6b5d31ff24cf\n",
"Successfully built kfp\n",
"Installing collected packages: kfp\n",
" Found existing installation: kfp 0.1\n",
" Uninstalling kfp-0.1:\n",
" Successfully uninstalled kfp-0.1\n",
"Successfully installed kfp-0.1\n",
"\u001b[33mYou are using pip version 18.1, however version 19.0.3 is available.\n",
"You should consider upgrading via the 'pip install --upgrade pip' command.\u001b[0m\n"
]
}
],
"source": [
"!pip3 install $KFP_PACKAGE --upgrade"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with YAML components: successful scenario"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author components in YAML"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# In yaml, one can optionally add the type information to both inputs and outputs.\n",
"# There are two ways to define the types: string or a dictionary with the openapi_schema_validator property.\n",
"# The openapi_schema_validator is a json schema object that describes schema of the parameter value.\n",
"component_a = '''\\\n",
"name: component a\n",
"description: component a desc\n",
"inputs:\n",
" - {name: field_l, type: Integer}\n",
"outputs:\n",
" - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
" - {name: field_n, type: customized_type}\n",
" - {name: field_o, type: GcsUri} \n",
"implementation:\n",
" container:\n",
" image: gcr.io/ml-pipeline/component-a\n",
" command: [python3, /pipelines/component/src/train.py]\n",
" args: [\n",
" --field-l, {inputValue: field_l},\n",
" ]\n",
" fileOutputs: \n",
" field_m: /schema.txt\n",
" field_n: /feature.txt\n",
" field_o: /output.txt\n",
"'''\n",
"component_b = '''\\\n",
"name: component b\n",
"description: component b desc\n",
"inputs:\n",
" - {name: field_x, type: customized_type}\n",
" - {name: field_y, type: GcsUri}\n",
" - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
"outputs:\n",
" - {name: output_model_uri, type: GcsUri}\n",
"implementation:\n",
" container:\n",
" image: gcr.io/ml-pipeline/component-a\n",
" command: [python3]\n",
" args: [\n",
" --field-x, {inputValue: field_x},\n",
" --field-y, {inputValue: field_y},\n",
" --field-z, {inputValue: field_z},\n",
" ]\n",
" fileOutputs: \n",
" output_model_uri: /schema.txt\n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author a pipeline with the above components"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [],
"source": [
"import kfp.components as comp\n",
"import kfp.dsl as dsl\n",
"import kfp.compiler as compiler\n",
"# The components are loaded as task factories that generate container_ops.\n",
"task_factory_a = comp.load_component_from_text(text=component_a)\n",
"task_factory_b = comp.load_component_from_text(text=component_b)\n",
"\n",
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_a',\n",
" description='')\n",
"def pipeline_a():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"compiler.Compiler().compile(pipeline_a, 'pipeline_a.tar.gz', type_check=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with YAML components: failed scenario"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author components in YAML"
]
},
{
"cell_type": "code",
"execution_count": 3,
"metadata": {},
"outputs": [],
"source": [
"# In this case, the component_a contains an output field_o as GcrUri \n",
"# but the component_b requires an input field_y as GcsUri\n",
"component_a = '''\\\n",
"name: component a\n",
"description: component a desc\n",
"inputs:\n",
" - {name: field_l, type: Integer}\n",
"outputs:\n",
" - {name: field_m, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
" - {name: field_n, type: customized_type}\n",
" - {name: field_o, type: GcrUri} \n",
"implementation:\n",
" container:\n",
" image: gcr.io/ml-pipeline/component-a\n",
" command: [python3, /pipelines/component/src/train.py]\n",
" args: [\n",
" --field-l, {inputValue: field_l},\n",
" ]\n",
" fileOutputs: \n",
" field_m: /schema.txt\n",
" field_n: /feature.txt\n",
" field_o: /output.txt\n",
"'''\n",
"component_b = '''\\\n",
"name: component b\n",
"description: component b desc\n",
"inputs:\n",
" - {name: field_x, type: customized_type}\n",
" - {name: field_y, type: GcsUri}\n",
" - {name: field_z, type: {GCSPath: {openapi_schema_validator: {type: string, pattern: \"^gs://.*$\" } }}}\n",
"outputs:\n",
" - {name: output_model_uri, type: GcsUri}\n",
"implementation:\n",
" container:\n",
" image: gcr.io/ml-pipeline/component-a\n",
" command: [python3]\n",
" args: [\n",
" --field-x, {inputValue: field_x},\n",
" --field-y, {inputValue: field_y},\n",
" --field-z, {inputValue: field_z},\n",
" ]\n",
" fileOutputs: \n",
" output_model_uri: /schema.txt\n",
"'''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author a pipeline with the above components"
]
},
{
"cell_type": "code",
"execution_count": 4,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"type name GcrUri is different from expected: GcsUri\n",
"Component \"component b\" is expecting field_y to be type(GcsUri), but the passed argument is type(GcrUri)\n"
]
}
],
"source": [
"import kfp.components as comp\n",
"import kfp.dsl as dsl\n",
"import kfp.compiler as compiler\n",
"from kfp.dsl.types import InconsistentTypeException\n",
"task_factory_a = comp.load_component_from_text(text=component_a)\n",
"task_factory_b = comp.load_component_from_text(text=component_b)\n",
"\n",
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_b',\n",
" description='')\n",
"def pipeline_b():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"try:\n",
" compiler.Compiler().compile(pipeline_b, 'pipeline_b.tar.gz', type_check=True)\n",
"except InconsistentTypeException as e:\n",
" print(e)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Author a pipeline with the above components but type checking disabled."
]
},
{
"cell_type": "code",
"execution_count": 7,
"metadata": {},
"outputs": [],
"source": [
"# Disable the type_check\n",
"compiler.Compiler().compile(pipeline_b, 'pipeline_b.tar.gz', type_check=False)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with decorated components: successful scenario"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author components with decorator"
]
},
{
"cell_type": "code",
"execution_count": 1,
"metadata": {},
"outputs": [],
"source": [
"from kfp.dsl import component\n",
"from kfp.dsl.types import Integer, GCSPath\n",
"from kfp.dsl import ContainerOp\n",
"# when components are defined based on the component decorator,\n",
"# the type information is annotated to the input or function returns.\n",
"# There are two ways to define the type: string or a dictionary with the openapi_schema_validator property\n",
"@component\n",
"def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
" 'field_n': 'customized_type',\n",
" 'field_o': 'Integer'\n",
" }:\n",
" return ContainerOp(\n",
" name = 'operator a',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" arguments = [\n",
" '--field-l', field_l,\n",
" ],\n",
" file_outputs = {\n",
" 'field_m': '/schema.txt',\n",
" 'field_n': '/feature.txt',\n",
" 'field_o': '/output.txt'\n",
" }\n",
" )\n",
"\n",
"# Users can also use the core types that are pre-defined in the SDK.\n",
"# For a full list of core types, check out: https://github.com/kubeflow/pipelines/blob/master/sdk/python/kfp/dsl/_types.py\n",
"@component\n",
"def task_factory_b(field_x: 'customized_type',\n",
" field_y: Integer(),\n",
" field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
" return ContainerOp(\n",
" name = 'operator b',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" command = [\n",
" 'python3',\n",
" field_x,\n",
" ],\n",
" arguments = [\n",
" '--field-y', field_y,\n",
" '--field-z', field_z,\n",
" ],\n",
" file_outputs = {\n",
" 'output_model_uri': '/schema.txt',\n",
" }\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author a pipeline with the above components"
]
},
{
"cell_type": "code",
"execution_count": 9,
"metadata": {},
"outputs": [],
"source": [
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_c',\n",
" description='')\n",
"def pipeline_c():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"compiler.Compiler().compile(pipeline_c, 'pipeline_c.tar.gz', type_check=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with decorated components: failure scenario"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author components with decorator"
]
},
{
"cell_type": "code",
"execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
"from kfp.dsl import component\n",
"from kfp.dsl.types import Integer, GCSPath\n",
"from kfp.dsl import ContainerOp\n",
"# task_factory_a outputs an input field_m with the openapi_schema_validator different\n",
"# from the task_factory_b's input field_z.\n",
"# One is gs:// and the other is gcs://\n",
"@component\n",
"def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
" 'field_n': 'customized_type',\n",
" 'field_o': 'Integer'\n",
" }:\n",
" return ContainerOp(\n",
" name = 'operator a',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" arguments = [\n",
" '--field-l', field_l,\n",
" ],\n",
" file_outputs = {\n",
" 'field_m': '/schema.txt',\n",
" 'field_n': '/feature.txt',\n",
" 'field_o': '/output.txt'\n",
" }\n",
" )\n",
"\n",
"@component\n",
"def task_factory_b(field_x: 'customized_type',\n",
" field_y: Integer(),\n",
" field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
" return ContainerOp(\n",
" name = 'operator b',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" command = [\n",
" 'python3',\n",
" field_x,\n",
" ],\n",
" arguments = [\n",
" '--field-y', field_y,\n",
" '--field-z', field_z,\n",
" ],\n",
" file_outputs = {\n",
" 'output_model_uri': '/schema.txt',\n",
" }\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author a pipeline with the above components"
]
},
{
"cell_type": "code",
"execution_count": 11,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
"GCSPath has a property openapi_schema_validator with value: {\"type\": \"string\", \"pattern\": \"^gs://.*$\"} and {\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}\n",
"Component \"task_factory_b\" is expecting field_z to be type({'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gcs://.*$\"}'}}), but the passed argument is type({'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}})\n"
]
}
],
"source": [
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_d',\n",
" description='')\n",
"def pipeline_d():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"try:\n",
" compiler.Compiler().compile(pipeline_d, 'pipeline_d.tar.gz', type_check=True)\n",
"except InconsistentTypeException as e:\n",
" print(e)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Author a pipeline with the above components but ignoring types."
]
},
{
"cell_type": "code",
"execution_count": 12,
"metadata": {},
"outputs": [],
"source": [
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_d',\n",
" description='')\n",
"def pipeline_d():\n",
" a = task_factory_a(field_l=12)\n",
" # For each of the arguments, authors can also ignore the types by calling ignore_type function.\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'].ignore_type())\n",
"compiler.Compiler().compile(pipeline_d, 'pipeline_d.tar.gz', type_check=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with missing type information"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author components(with missing types)"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"from kfp.dsl import component\n",
"from kfp.dsl.types import Integer, GCSPath\n",
"from kfp.dsl import ContainerOp\n",
"# task_factory_a lacks the type information for output filed_n\n",
"# task_factory_b lacks the type information for input field_y\n",
"# When no type information is provided, it matches all types.\n",
"@component\n",
"def task_factory_a(field_l: Integer()) -> {'field_m': {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, \n",
" 'field_o': 'Integer'\n",
" }:\n",
" return ContainerOp(\n",
" name = 'operator a',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" arguments = [\n",
" '--field-l', field_l,\n",
" ],\n",
" file_outputs = {\n",
" 'field_m': '/schema.txt',\n",
" 'field_n': '/feature.txt',\n",
" 'field_o': '/output.txt'\n",
" }\n",
" )\n",
"\n",
"@component\n",
"def task_factory_b(field_x: 'customized_type',\n",
" field_y,\n",
" field_z: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}) -> {'output_model_uri': 'GcsUri'}:\n",
" return ContainerOp(\n",
" name = 'operator b',\n",
" image = 'gcr.io/ml-pipeline/component-a',\n",
" command = [\n",
" 'python3',\n",
" field_x,\n",
" ],\n",
" arguments = [\n",
" '--field-y', field_y,\n",
" '--field-z', field_z,\n",
" ],\n",
" file_outputs = {\n",
" 'output_model_uri': '/schema.txt',\n",
" }\n",
" )"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Author a pipeline with the above components"
]
},
{
"cell_type": "code",
"execution_count": 14,
"metadata": {},
"outputs": [],
"source": [
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_e',\n",
" description='')\n",
"def pipeline_e():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(field_x=a.outputs['field_n'], field_y=a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"compiler.Compiler().compile(pipeline_e, 'pipeline_e.tar.gz', type_check=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check with both named arguments and positional arguments"
]
},
{
"cell_type": "code",
"execution_count": 15,
"metadata": {},
"outputs": [],
"source": [
"#Use the component as part of the pipeline\n",
"@dsl.pipeline(name='type_check_f',\n",
" description='')\n",
"def pipeline_f():\n",
" a = task_factory_a(field_l=12)\n",
" b = task_factory_b(a.outputs['field_n'], a.outputs['field_o'], field_z=a.outputs['field_m'])\n",
"\n",
"compiler.Compiler().compile(pipeline_f, 'pipeline_f.tar.gz', type_check=True)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Type Check between pipeline parameters and component parameters"
]
},
{
"cell_type": "code",
"execution_count": 6,
"metadata": {},
"outputs": [],
"source": [
"@component\n",
"def task_factory_a(field_m: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}, field_o: 'Integer'):\n",
" return ContainerOp(\n",
" name = 'operator a',\n",
" image = 'gcr.io/ml-pipeline/component-b',\n",
" arguments = [\n",
" '--field-l', field_m,\n",
" '--field-o', field_o,\n",
" ],\n",
" )\n",
"\n",
"# Pipeline input types are also checked against the component I/O types.\n",
"@dsl.pipeline(name='type_check_g',\n",
" description='')\n",
"def pipeline_g(a: {'GCSPath': {'openapi_schema_validator': '{\"type\": \"string\", \"pattern\": \"^gs://.*$\"}'}}='good', b: Integer()=12):\n",
" task_factory_a(field_m=a, field_o=b)\n",
"\n",
"try:\n",
" compiler.Compiler().compile(pipeline_g, 'pipeline_g.tar.gz', type_check=True)\n",
"except InconsistentTypeException as e:\n",
" print(e)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Clean up"
]
},
{
"cell_type": "code",
"execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
"from pathlib import Path\n",
"for p in Path(\".\").glob(\"pipeline_[a-g].tar.gz\"):\n",
" p.unlink()"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}

View File

@ -12,4 +12,4 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#TODO: wrap the DSL level configuration into one Config
TYPE_CHECK = False
TYPE_CHECK = True

View File

@ -673,7 +673,7 @@ class Compiler(object):
workflow = self._create_pipeline_workflow(args_list_with_defaults, p)
return workflow
def compile(self, pipeline_func, package_path, type_check=False):
def compile(self, pipeline_func, package_path, type_check=True):
"""Compile the given pipeline function into workflow yaml.
Args:

View File

@ -43,9 +43,9 @@ def parse_arguments():
type=str,
required=True,
help='local path to the output workflow yaml file.')
parser.add_argument('--type-check',
parser.add_argument('--disable-type-check',
action='store_true',
help='enable the type check, default is disabled.')
help='disable the type check, default is enabled.')
args = parser.parse_args()
return args
@ -100,9 +100,9 @@ def main():
(args.py is not None and args.package is not None)):
raise ValueError('Either --py or --package is needed but not both.')
if args.py:
compile_pyfile(args.py, args.function, args.output, args.type_check)
compile_pyfile(args.py, args.function, args.output, not args.disable_type_check)
else:
if args.namespace is None:
raise ValueError('--namespace is required for compiling packages.')
compile_package(args.package, args.namespace, args.function, args.output, args.type_check)
compile_package(args.package, args.namespace, args.function, args.output, not args.disable_type_check)