fix(sdk.v2): Fix importer ignoring `reimport` setting, and switch to Protobuf.Value for import uri. (#6827)

* fix importer

* release note

* remove error usage in test sample

* disable importer test
This commit is contained in:
Chen Sun 2021-10-28 20:40:51 -07:00 committed by GitHub
parent ea2e5be81a
commit 8c6843fdb8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 49 additions and 224 deletions

View File

@ -88,8 +88,9 @@
path: samples.v2.hello_world_test
- name: producer_consumer_param
path: samples.v2.producer_consumer_param_test
- name: pipeline_with_importer
path: samples.v2.pipeline_with_importer_test
# TODO: Re-enable after fixing protobuf.Value support for importer
# - name: pipeline_with_importer
# path: samples.v2.pipeline_with_importer_test
# TODO(Bobgy): Re-enable after figuring out V2 Engine
# and protobuf.Value support.
# - name: cache_v2

View File

@ -15,6 +15,8 @@
## Bug Fixes and Other Changes
* Fix importer ignoring reimport setting, and switch to Protobuf.Value for import uri [\#6827](https://github.com/kubeflow/pipelines/pull/6827)
## Documentation Updates
# 1.8.7

View File

@ -99,43 +99,6 @@
}
}
},
"comp-importer-3": {
"executorLabel": "exec-importer-3",
"inputDefinitions": {
"parameters": {
"uri": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"artifact": {
"artifactType": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
}
}
},
"comp-pass-through-op": {
"executorLabel": "exec-pass-through-op",
"inputDefinitions": {
"parameters": {
"value": {
"parameterType": "STRING"
}
}
},
"outputDefinitions": {
"parameters": {
"Output": {
"parameterType": "STRING"
}
}
}
},
"comp-train": {
"executorLabel": "exec-train",
"inputDefinitions": {
@ -191,34 +154,6 @@
}
}
}
},
"comp-train-3": {
"executorLabel": "exec-train-3",
"inputDefinitions": {
"artifacts": {
"dataset": {
"artifactType": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
}
},
"outputDefinitions": {
"artifacts": {
"model": {
"artifactType": {
"schemaTitle": "system.Model",
"schemaVersion": "0.0.1"
}
}
},
"parameters": {
"scalar": {
"parameterType": "STRING"
}
}
}
}
},
"deploymentSpec": {
@ -226,9 +161,7 @@
"exec-importer": {
"importer": {
"artifactUri": {
"constantValue": {
"stringValue": "gs://ml-pipeline-playground/shakespeare1.txt"
}
"constant": "gs://ml-pipeline-playground/shakespeare1.txt"
},
"typeSchema": {
"schemaTitle": "system.Dataset",
@ -241,40 +174,13 @@
"artifactUri": {
"runtimeParameter": "uri"
},
"reimport": true,
"typeSchema": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
},
"exec-importer-3": {
"importer": {
"artifactUri": {
"runtimeParameter": "uri"
},
"typeSchema": {
"schemaTitle": "system.Dataset",
"schemaVersion": "0.0.1"
}
}
},
"exec-pass-through-op": {
"container": {
"args": [
"--value",
"{{$.inputs.parameters['value']}}",
"----output-paths",
"{{$.outputs.parameters['Output'].output_file}}"
],
"command": [
"sh",
"-ec",
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3 -u \"$program_path\" \"$@\"\n",
"def pass_through_op(value):\n return value\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value, str):\n raise TypeError('Value \"{}\" has type \"{}\" instead of str.'.format(\n str(str_value), str(type(str_value))))\n return str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog='Pass through op', description='')\n_parser.add_argument(\"--value\", dest=\"value\", type=str, required=True, default=argparse.SUPPRESS)\n_parser.add_argument(\"----output-paths\", dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files = _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = pass_through_op(**_parsed_args)\n\n_outputs = [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except OSError:\n pass\n with open(output_file, 'w') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"
],
"image": "python:3.7"
}
},
"exec-train": {
"container": {
"args": [
@ -286,7 +192,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
@ -306,27 +212,7 @@
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
"\nimport kfp\nfrom kfp.v2 import dsl\nfrom kfp.v2.dsl import *\nfrom typing import *\n\ndef train(\n dataset: Input[Dataset]\n) -> NamedTuple('Outputs', [\n ('scalar', str),\n ('model', Model),\n]):\n \"\"\"Dummy Training step.\"\"\"\n with open(dataset.path, 'r') as f:\n data = f.read()\n print('Dataset:', data)\n\n scalar = '123'\n model = 'My model trained using data: {}'.format(data)\n\n from collections import namedtuple\n output = namedtuple('Outputs', ['scalar', 'model'])\n return output(scalar, model)\n\n"
],
"image": "python:3.7"
}
},
"exec-train-3": {
"container": {
"args": [
"--executor_input",
"{{$}}",
"--function_to_execute",
"train"
],
"command": [
"sh",
"-c",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.6' && \"$0\" \"$@\"\n",
"\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip || python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1 python3 -m pip install --quiet --no-warn-script-location 'kfp==1.8.7' && \"$0\" \"$@\"\n",
"sh",
"-ec",
"program_path=$(mktemp -d)\nprintf \"%s\" \"$0\" > \"$program_path/ephemeral_component.py\"\npython3 -m kfp.v2.components.executor_main --component_module_path \"$program_path/ephemeral_component.py\" \"$@\"\n",
@ -381,9 +267,7 @@
"parameters": {
"uri": {
"runtimeValue": {
"constantValue": {
"stringValue": "gs://ml-pipeline-playground/shakespeare1.txt"
}
"constant": "gs://ml-pipeline-playground/shakespeare1.txt"
}
}
}
@ -392,48 +276,6 @@
"name": "importer"
}
},
"importer-3": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-importer-3"
},
"dependentTasks": [
"pass-through-op"
],
"inputs": {
"parameters": {
"uri": {
"taskOutputParameter": {
"outputParameterKey": "Output",
"producerTask": "pass-through-op"
}
}
}
},
"taskInfo": {
"name": "importer-3"
}
},
"pass-through-op": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-pass-through-op"
},
"inputs": {
"parameters": {
"value": {
"componentInputParameter": "dataset2"
}
}
},
"taskInfo": {
"name": "pass-through-op"
}
},
"train": {
"cachingOptions": {
"enableCache": true
@ -457,30 +299,6 @@
"taskInfo": {
"name": "train"
}
},
"train-3": {
"cachingOptions": {
"enableCache": true
},
"componentRef": {
"name": "comp-train-3"
},
"dependentTasks": [
"importer-3"
],
"inputs": {
"artifacts": {
"dataset": {
"taskOutputArtifact": {
"outputArtifactKey": "artifact",
"producerTask": "importer-3"
}
}
}
},
"taskInfo": {
"name": "train-3"
}
}
}
},
@ -493,7 +311,7 @@
}
},
"schemaVersion": "2.1.0",
"sdkVersion": "kfp-1.8.6"
"sdkVersion": "kfp-1.8.7"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root",

View File

@ -59,10 +59,6 @@ def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'):
artifact_uri=dataset2, artifact_class=Dataset, reimport=True)
train(dataset=importer2.output)
importer3 = importer(
artifact_uri=pass_through_op(dataset2).output, artifact_class=Dataset)
train(dataset=importer3.output)
if __name__ == '__main__':
compiler.Compiler().compile(

View File

@ -28,24 +28,26 @@ OUTPUT_KEY = 'artifact'
def _build_importer_spec(
artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
reimport: bool,
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec:
"""Builds an importer executor spec.
Args:
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
reimport: Whether to reimport the artifact.
Returns:
An importer spec.
An importer spec.
"""
importer_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec()
importer_spec.type_schema.CopyFrom(artifact_type_schema)
importer_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec(
type_schema=artifact_type_schema, reimport=reimport)
#importer_spec.type_schema.CopyFrom(artifact_type_schema)
if isinstance(artifact_uri, _pipeline_param.PipelineParam):
importer_spec.artifact_uri.runtime_parameter = INPUT_KEY
elif isinstance(artifact_uri, str):
importer_spec.artifact_uri.constant_value.string_value = artifact_uri
importer_spec.artifact_uri.constant.string_value = artifact_uri
return importer_spec
@ -57,11 +59,11 @@ def _build_importer_task_spec(
"""Builds an importer task spec.
Args:
importer_base_name: The base name of the importer node.
artifact_uri: The artifact uri to import from.
importer_base_name: The base name of the importer node.
artifact_uri: The artifact uri to import from.
Returns:
An importer node task spec.
An importer node task spec.
"""
result = pipeline_spec_pb2.PipelineTaskSpec()
result.component_ref.name = dsl_utils.sanitize_component_name(
@ -80,7 +82,7 @@ def _build_importer_task_spec(
INPUT_KEY].component_input_parameter = param.full_name
elif isinstance(artifact_uri, str):
result.inputs.parameters[
INPUT_KEY].runtime_value.constant_value.string_value = artifact_uri
INPUT_KEY].runtime_value.constant.string_value = artifact_uri
return result
@ -92,12 +94,12 @@ def _build_importer_component_spec(
"""Builds an importer component spec.
Args:
importer_base_name: The base name of the importer node.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
importer_base_name: The base name of the importer node.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
Returns:
An importer node component spec.
An importer node component spec.
"""
result = pipeline_spec_pb2.ComponentSpec()
result.executor_label = dsl_utils.sanitize_executor_label(
@ -154,7 +156,9 @@ def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_type_schema = type_utils.get_artifact_type_schema(artifact_class)
task.importer_spec = _build_importer_spec(
artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema)
artifact_uri=artifact_uri,
artifact_type_schema=artifact_type_schema,
reimport=reimport)
task.task_spec = _build_importer_task_spec(
importer_base_name=task.name, artifact_uri=artifact_uri)
task.component_spec = _build_importer_component_spec(

View File

@ -23,6 +23,9 @@ from kfp.v2.components import importer_node
class ImporterNodeTest(parameterized.TestCase):
def setUp(self):
self.maxDiff = None
@parameterized.parameters(
{
# artifact_uri is a constant value
@ -30,15 +33,16 @@ class ImporterNodeTest(parameterized.TestCase):
'gs://artifact',
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Dataset'),
'reimport':
True,
'expected_result': {
'artifactUri': {
'constantValue': {
'stringValue': 'gs://artifact'
}
'constant': 'gs://artifact'
},
'typeSchema': {
'schemaTitle': 'system.Dataset'
}
},
'reimport': True
}
},
{
@ -47,23 +51,27 @@ class ImporterNodeTest(parameterized.TestCase):
_pipeline_param.PipelineParam(name='uri_to_import'),
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Model'),
'reimport':
False,
'expected_result': {
'artifactUri': {
'runtimeParameter': 'uri'
},
'typeSchema': {
'schemaTitle': 'system.Model'
}
},
'reimport': False
},
})
def test_build_importer_spec(self, input_uri, artifact_type_schema,
expected_result):
reimport, expected_result):
expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec()
json_format.ParseDict(expected_result, expected_importer_spec)
importer_spec = importer_node._build_importer_spec(
artifact_uri=input_uri, artifact_type_schema=artifact_type_schema)
artifact_uri=input_uri,
artifact_type_schema=artifact_type_schema,
reimport=reimport)
self.maxDiff = None
self.assertEqual(expected_importer_spec, importer_spec)
@parameterized.parameters(
@ -76,9 +84,7 @@ class ImporterNodeTest(parameterized.TestCase):
'parameters': {
'uri': {
'runtimeValue': {
'constantValue': {
'stringValue': 'gs://artifact'
}
'constant': 'gs://artifact'
}
}
}
@ -113,7 +119,6 @@ class ImporterNodeTest(parameterized.TestCase):
task_spec = importer_node._build_importer_task_spec(
importer_base_name=importer_name, artifact_uri=input_uri)
self.maxDiff = None
self.assertEqual(expected_task_spec, task_spec)
def test_build_importer_component_spec(self):
@ -144,7 +149,6 @@ class ImporterNodeTest(parameterized.TestCase):
artifact_type_schema=pb.ArtifactTypeSchema(
schema_title='system.Artifact'))
self.maxDiff = None
self.assertEqual(expected_importer_comp_spec, importer_comp_spec)
def test_import_with_invalid_artifact_uri_value_should_fail(self):