fix(sdk.v2): Support dict, list, bool typed input parameters from constant values and pipeline inputs. (#6523)

* fix dict list typed inputs support

* Support passing dict, list, bool typed parameter via client

* update release note

* fix test
This commit is contained in:
Chen Sun 2021-09-07 19:53:07 -07:00 committed by GitHub
parent 439d8c87c1
commit 0fba85cfab
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 122 additions and 42 deletions

View File

@ -15,6 +15,7 @@
## Bug Fixes and Other Changes
* Remove dead code on importer check in v1. [\#6508](https://github.com/kubeflow/pipelines/pull/6508)
* Fix issue where dict, list, bool typed input parameters don't accept constant values or pipeline inputs. [\#6523](https://github.com/kubeflow/pipelines/pull/6523)
* Depends on `kfp-pipeline-spec>=0.1.10,<0.2.0` [\#6515](https://github.com/kubeflow/pipelines/pull/6515)
## Documentation Updates

View File

@ -15,6 +15,7 @@
import collections
import copy
import inspect
import json
import pathlib
from typing import Any, Mapping, Optional
@ -577,20 +578,29 @@ def _attach_v2_specs(
elif isinstance(argument_value, int):
argument_type = 'Integer'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = argument_value
input_name].runtime_value.constant_value.int_value = (
argument_value)
elif isinstance(argument_value, float):
argument_type = 'Float'
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.double_value = argument_value
input_name].runtime_value.constant_value.double_value = (
argument_value)
elif isinstance(argument_value,
(dict, list, bool)) and kfp.COMPILING_FOR_V2:
argument_type = type(argument_value).__name__
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
json.dumps(argument_value))
elif isinstance(argument_value, _container_op.ContainerOp):
raise TypeError(
'ContainerOp object {} was passed to component as an input argument. '
'Pass a single output instead.'.format(input_name))
f'ContainerOp object {input_name} was passed to component as an '
'input argument. Pass a single output instead.')
else:
if kfp.COMPILING_FOR_V2:
raise NotImplementedError(
'Input argument supports only the following types: PipelineParam'
', str, int, float. Got: "{}".'.format(argument_value))
'Input argument supports only the following types: '
'PipelineParam, str, int, float, bool, dict, and list. Got: '
f'"{argument_value}".')
argument_is_parameter_type = type_utils.is_parameter_type(argument_type)
input_is_parameter_type = type_utils.is_parameter_type(input_type)

View File

@ -23,20 +23,20 @@ import inspect
import json
import uuid
import warnings
from typing import Any, Callable, Dict, List, Mapping, Optional, Set, Tuple, Union
from google.protobuf import json_format
from typing import (Any, Callable, Dict, List, Mapping, Optional, Set, Tuple,
Union)
import kfp
from kfp.compiler._k8s_helper import sanitize_k8s_name
from google.protobuf import json_format
from kfp import dsl
from kfp.compiler._k8s_helper import sanitize_k8s_name
from kfp.dsl import _for_loop
from kfp.v2.compiler import compiler_utils
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import dsl_utils
from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.v2.components.types import artifact_types, type_utils
from kfp.v2.compiler import compiler_utils
from kfp.v2.components import component_factory
from kfp.v2.components.types import artifact_types, type_utils
_GroupOrOp = Union[dsl.OpsGroup, dsl.BaseOp]
@ -1094,11 +1094,12 @@ class Compiler(object):
break
if not type_utils.is_parameter_type(arg_type):
raise TypeError(
'The pipeline argument "{arg_name}" is viewed as an artifact due to '
'its type "{arg_type}". And we currently do not support passing '
'artifacts as pipeline inputs. Consider type annotating the argument'
' with a primitive type, such as "str", "int", and "float".'
.format(arg_name=arg_name, arg_type=arg_type))
'The pipeline argument "{arg_name}" is viewed as an artifact'
' due to its type "{arg_type}". And we currently do not '
'support passing artifacts as pipeline inputs. Consider type'
' annotating the argument with a primitive type, such as '
'"str", "int", "float", "bool", "dict", and "list".'.format(
arg_name=arg_name, arg_type=arg_type))
args_list.append(
dsl.PipelineParam(
sanitize_k8s_name(arg_name, True), param_type=arg_type))

View File

@ -268,10 +268,11 @@ class CompilerTest(unittest.TestCase):
with self.assertRaisesRegex(
TypeError,
'The pipeline argument \"input1\" is viewed as an artifact due to its '
'type \"None\". And we currently do not support passing artifacts as '
'pipeline inputs. Consider type annotating the argument with a primitive'
' type, such as \"str\", \"int\", and \"float\".'):
'The pipeline argument \"input1\" is viewed as an artifact due '
'to its type \"None\". And we currently do not support passing '
'artifacts as pipeline inputs. Consider type annotating the '
'argument with a primitive type, such as \"str\", \"int\", '
'\"float\", \"bool\", \"dict\", and \"list\".'):
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path='output.json')

View File

@ -5,6 +5,12 @@
"executorLabel": "exec-preprocess",
"inputDefinitions": {
"parameters": {
"input_dict_parameter": {
"type": "STRING"
},
"input_list_parameter": {
"type": "STRING"
},
"message": {
"type": "STRING"
}
@ -101,11 +107,11 @@
"command": [
"sh",
"-c",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' --user) && \"$0\" \"$@\"",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' --user) && \"$0\" \"$@\"",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef preprocess(\n # An input parameter of type string.\n message: str,\n # Use Output[T] to get a metadata-rich handle to the output artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n # A locally accessible filepath for another output artifact of type\n # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n # A locally accessible filepath for an output parameter of type string.\n output_parameter_path: OutputPath(str),\n # A locally accessible filepath for an output parameter of type bool.\n output_bool_parameter_path: OutputPath(bool),\n # A locally accessible filepath for an output parameter of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n # A locally accessible filepath for an output parameter of type list.\n output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local file path for writing.\n # One can also use Dataset.uri to access the actual URI file path.\n with open(output_dataset_one.path, 'w') as f:\n f.write(message)\n\n # OutputPath is used to just pass the local file path of the output artifact\n # to the function.\n with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n str(True)) # use either `str()` or `json.dumps()` for bool values.\n\n import json\n with open(output_dict_parameter_path, 'w') as f:\n f.write(json.dumps({'A': 1, 'B': 2}))\n\n with open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(['a', 'b', 'c']))\n\n"
"\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef preprocess(\n # An input parameter of type string.\n message: str,\n # An input parameter of type dict.\n input_dict_parameter: Dict[str, int],\n # An input parameter of type list.\n input_list_parameter: List[str],\n # Use Output[T] to get a metadata-rich handle to the output artifact\n # of type `Dataset`.\n output_dataset_one: Output[Dataset],\n # A locally accessible filepath for another output artifact of type\n # `Dataset`.\n output_dataset_two_path: OutputPath('Dataset'),\n # A locally accessible filepath for an output parameter of type string.\n output_parameter_path: OutputPath(str),\n # A locally accessible filepath for an output parameter of type bool.\n output_bool_parameter_path: OutputPath(bool),\n # A locally accessible filepath for an output parameter of type dict.\n output_dict_parameter_path: OutputPath(Dict[str, int]),\n # A locally accessible filepath for an output parameter of type list.\n output_list_parameter_path: OutputPath(List[str]),\n):\n \"\"\"Dummy preprocessing step.\"\"\"\n\n # Use Dataset.path to access a local file path for writing.\n # One can also use Dataset.uri to access the actual URI file path.\n with open(output_dataset_one.path, 'w') as f:\n f.write(message)\n\n # OutputPath is used to just pass the local file path of the output artifact\n # to the function.\n with open(output_dataset_two_path, 'w') as f:\n f.write(message)\n\n with open(output_parameter_path, 'w') as f:\n f.write(message)\n\n with open(output_bool_parameter_path, 'w') as f:\n f.write(\n str(True)) # use either `str()` or `json.dumps()` for bool values.\n\n import json\n with open(output_dict_parameter_path, 'w') as f:\n f.write(json.dumps(input_dict_parameter))\n\n with open(output_list_parameter_path, 'w') as f:\n f.write(json.dumps(input_list_parameter))\n\n"
],
"image": "python:3.7"
}
@ -121,7 +127,7 @@
"command": [
"sh",
"-c",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.7.2' --user) && \"$0\" \"$@\"",
"(python3 -m ensurepip || python3 -m ensurepip --user) && (PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' || PIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.0' --user) && \"$0\" \"$@\"",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
@ -147,6 +153,16 @@
},
"inputs": {
"parameters": {
"input_dict_parameter": {
"componentInputParameter": "input_dict"
},
"input_list_parameter": {
"runtimeValue": {
"constantValue": {
"stringValue": "[\"a\", \"b\", \"c\"]"
}
}
},
"message": {
"componentInputParameter": "message"
}
@ -223,6 +239,9 @@
},
"inputDefinitions": {
"parameters": {
"input_dict": {
"type": "STRING"
},
"message": {
"type": "STRING"
}
@ -230,9 +249,14 @@
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.7.2"
"sdkVersion": "kfp-1.8.0"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root"
"gcsOutputDirectory": "dummy_root",
"parameters": {
"input_dict": {
"stringValue": "{\"A\": 1, \"B\": 2}"
}
}
}
}

View File

@ -14,15 +14,20 @@
"""Sample pipeline for passing data in KFP v2."""
from typing import Dict, List
from kfp.v2 import dsl
from kfp.v2.dsl import Input, InputPath, Output, OutputPath, Dataset, Model, component
import kfp.v2.compiler as compiler
from kfp.v2 import dsl
from kfp.v2.dsl import (Dataset, Input, InputPath, Model, Output, OutputPath,
component)
@component
def preprocess(
# An input parameter of type string.
message: str,
# An input parameter of type dict.
input_dict_parameter: Dict[str, int],
# An input parameter of type list.
input_list_parameter: List[str],
# Use Output[T] to get a metadata-rich handle to the output artifact
# of type `Dataset`.
output_dataset_one: Output[Dataset],
@ -59,10 +64,10 @@ def preprocess(
import json
with open(output_dict_parameter_path, 'w') as f:
f.write(json.dumps({'A': 1, 'B': 2}))
f.write(json.dumps(input_dict_parameter))
with open(output_list_parameter_path, 'w') as f:
f.write(json.dumps(['a', 'b', 'c']))
f.write(json.dumps(input_list_parameter))
@component
@ -111,9 +116,13 @@ def train(
@dsl.pipeline(pipeline_root='dummy_root', name='my-test-pipeline-beta')
def pipeline(message: str):
def pipeline(message: str, input_dict: Dict[str, int] = {'A': 1, 'B': 2}):
preprocess_task = preprocess(message=message)
preprocess_task = preprocess(
message=message,
input_dict_parameter=input_dict,
input_list_parameter=['a', 'b', 'c'],
)
train_task = train(
dataset_one_path=preprocess_task.outputs['output_dataset_one'],
dataset_two=preprocess_task.outputs['output_dataset_two_path'],

View File

@ -16,15 +16,12 @@
import datetime
import json
import os
from typing import Any, Dict
import unittest
from typing import Any, Dict
from unittest import mock
from googleapiclient import discovery
from googleapiclient import http
from kfp.v2.google.client import client
from kfp.v2.google.client import client_utils
from googleapiclient import discovery, http
from kfp.v2.google.client import client, client_utils
# Mock response for get job request.
_EXPECTED_GET_RESPONSE = 'good job spec'
@ -124,7 +121,10 @@ class ClientTest(unittest.TestCase):
job_spec_path='path/to/pipeline_job.json',
job_id='my-new-id',
pipeline_root='gs://bucket/new-blob',
parameter_values={'text': 'Hello test!'})
parameter_values={
'text': 'Hello test!',
'list': [1, 2, 3],
})
golden = _load_test_data('pipeline_job.json')
golden['name'] = ('projects/test-project/locations/us-central1/'
@ -135,6 +135,9 @@ class ClientTest(unittest.TestCase):
golden['runtimeConfig']['parameters']['text'] = {
'stringValue': 'Hello test!'
}
golden['runtimeConfig']['parameters']['list'] = {
'stringValue': '[1, 2, 3]'
}
mock_submit.assert_called_once_with(
job_spec=golden, job_id='my-new-id')

View File

@ -14,6 +14,7 @@
"""Builder for CAIP pipelines Pipeline level proto spec."""
import copy
import json
from typing import Any, Dict, Mapping, Optional, Union
@ -79,6 +80,10 @@ class RuntimeConfigBuilder(object):
Args:
parameter_values: The mapping from runtime parameter names to its values.
"""
if parameter_values:
for k, v in parameter_values.items():
if isinstance(v, (dict, list, bool)):
parameter_values[k] = json.dumps(v)
if parameter_values:
self._parameter_values.update(parameter_values)

View File

@ -13,9 +13,9 @@
# limitations under the License.
"""Tests for kfp.v2.google.client.runtime_config_builder."""
import frozendict
import unittest
import frozendict
from kfp.v2.google.client import runtime_config_builder
@ -37,7 +37,16 @@ class RuntimeConfigBuilderTest(unittest.TestCase):
},
'new_param': {
'type': 'STRING'
}
},
'bool_param': {
'type': 'STRING'
},
'dict_param': {
'type': 'STRING'
},
'list_param': {
'type': 'STRING'
},
}
}
}
@ -103,7 +112,12 @@ class RuntimeConfigBuilderTest(unittest.TestCase):
my_builder.update_pipeline_root('path/to/my/new/root')
my_builder.update_runtime_parameters({
'int_param': 888,
'new_param': 'new-string'
'new_param': 'new-string',
'dict_param': {
'a': 1
},
'list_param': [1, 2, 3],
'bool_param': True,
})
actual_runtime_config = my_builder.build()
@ -122,6 +136,15 @@ class RuntimeConfigBuilderTest(unittest.TestCase):
'new_param': {
'stringValue': 'new-string'
},
'dict_param': {
'stringValue': '{"a": 1}'
},
'list_param': {
'stringValue': '[1, 2, 3]'
},
'bool_param': {
'stringValue': 'true'
},
}
}
self.assertEqual(expected_runtime_config, actual_runtime_config)

View File

@ -279,6 +279,9 @@
"parameters": {
"text": {
"type": "STRING"
},
"list": {
"type": "STRING"
}
}
}