feat(sdk.v2): Support explicit importer (#5502)

This commit is contained in:
Chen Sun 2021-04-19 13:14:09 -07:00 committed by GitHub
parent 075b83af9c
commit 5dafda1270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 806 additions and 974 deletions

View File

@ -681,6 +681,9 @@ class Compiler(object):
if hasattr(op, 'custom_job_spec'):
warnings.warn('CustomJob spec is not supported yet when running on KFP.'
' The component will execute within the KFP cluster.')
if hasattr(op, 'importer_spec'):
raise NotImplementedError(
'dsl.importer is not supported yet when running on KFP.')
return templates

View File

@ -21,6 +21,7 @@ from ._pipeline_volume import PipelineVolume
from ._volume_snapshot_op import VolumeSnapshotOp
from ._ops_group import OpsGroup, ExitHandler, Condition, ParallelFor, SubGraph
from ._component import python_component, graph_component, component
from .importer_node import importer
EXECUTION_ID_PLACEHOLDER = '{{workflow.uid}}-{{pod.name}}'
RUN_ID_PLACEHOLDER = '{{workflow.uid}}'

View File

@ -27,7 +27,6 @@ from kfp.dsl import types
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import _container_op
from kfp.dsl import dsl_utils
from kfp.dsl import importer_node
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
@ -368,15 +367,6 @@ def _attach_v2_specs(
raise TypeError('Input "{}" with type "{}" cannot be paired with '
'InputPathPlaceholder.'.format(
input_key, inputs_dict[input_key].type))
elif is_compiling_for_v2 and input_key in importer_specs:
raise TypeError(
'Input "{}" with type "{}" is not connected to any upstream output. '
'However it is used with InputPathPlaceholder. '
'If you want to import an existing artifact using a system-connected'
' importer node, use InputUriPlaceholder instead. '
'Or if you just want to pass a string parameter, use string type and'
' InputValuePlaceholder instead.'.format(
input_key, inputs_dict[input_key].type))
else:
return "{{{{$.inputs.artifacts['{}'].path}}}}".format(input_key)
@ -463,9 +453,6 @@ def _attach_v2_specs(
pipeline_task_spec = pipeline_spec_pb2.PipelineTaskSpec()
# Keep track of auto-injected importer spec.
importer_specs = {}
# Check types of the reference arguments and serialize PipelineParams
original_arguments = arguments
arguments = arguments.copy()
@ -507,17 +494,6 @@ def _attach_v2_specs(
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.output_artifact_key = (
argument_value.name)
elif is_compiling_for_v2:
# argument_value.op_name could be none, in which case an importer node
# will be inserted later.
# Importer node is only applicable for v2 engine.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema,
pipeline_param_name=argument_value.name)
elif isinstance(argument_value, str):
pipeline_params = _pipeline_param.extract_pipelineparams_from_any(
argument_value)
@ -557,15 +533,6 @@ def _attach_v2_specs(
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.string_value = (
argument_value)
elif is_compiling_for_v2:
# An importer node with constant value artifact_uri will be inserted.
# Importer node is only applicable for v2 engine.
pipeline_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = ''
type_schema = type_utils.get_input_artifact_type_schema(
input_name, component_spec.inputs)
importer_specs[input_name] = importer_node.build_importer_spec(
input_type_schema=type_schema, constant_value=argument_value)
elif isinstance(argument_value, int):
pipeline_task_spec.inputs.parameters[
input_name].runtime_value.constant_value.int_value = argument_value
@ -606,7 +573,6 @@ def _attach_v2_specs(
component_spec, executor_label, arguments.keys())
task.task_spec = pipeline_task_spec
task.importer_specs = importer_specs
# Override command and arguments if compiling to v2.
if is_compiling_for_v2:

View File

@ -13,53 +13,53 @@
# limitations under the License.
"""Utility function for building Importer Node spec."""
from typing import Optional
from typing import Optional, Union, Type
from kfp.dsl import _container_op
from kfp.dsl import _pipeline_param
from kfp.dsl import dsl_utils
from kfp.dsl import io_types
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
OUTPUT_KEY = 'result'
INPUT_KEY = 'uri'
OUTPUT_KEY = 'artifact'
def build_importer_spec(
input_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
pipeline_param_name: Optional[str] = None,
constant_value: Optional[str] = None
def _build_importer_spec(
artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
) -> pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec:
"""Builds an importer executor spec.
Args:
input_type_schema: The type of the input artifact.
pipeline_param_name: The name of the pipeline parameter if the importer gets
its artifacts_uri via a pipeline parameter. This argument is mutually
exclusive with constant_value.
constant_value: The value of artifact_uri in case a contant value is passed
directly into the compoent op. This argument is mutually exclusive with
pipeline_param_name.
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
Returns:
An importer spec.
"""
assert bool(pipeline_param_name is None) != bool(constant_value is None), (
'importer spec should be built using either pipeline_param_name or '
'constant_value.')
importer_spec = pipeline_spec_pb2.PipelineDeploymentConfig.ImporterSpec()
importer_spec.type_schema.CopyFrom(input_type_schema)
# TODO: subject to IR change on artifact_uri message type.
if pipeline_param_name:
importer_spec.artifact_uri.runtime_parameter = pipeline_param_name
elif constant_value:
importer_spec.artifact_uri.constant_value.string_value = constant_value
importer_spec.type_schema.CopyFrom(artifact_type_schema)
if isinstance(artifact_uri, _pipeline_param.PipelineParam):
importer_spec.artifact_uri.runtime_parameter = INPUT_KEY
elif isinstance(artifact_uri, str):
importer_spec.artifact_uri.constant_value.string_value = artifact_uri
return importer_spec
def build_importer_task_spec(
def _build_importer_task_spec(
importer_base_name: str,
artifact_uri: Union[_pipeline_param.PipelineParam, str],
) -> pipeline_spec_pb2.PipelineTaskSpec:
"""Builds an importer task spec.
Args:
importer_base_name: The base name of the importer node.
artifact_uri: The artifact uri to import from.
Returns:
An importer node task spec.
@ -69,21 +69,26 @@ def build_importer_task_spec(
result.component_ref.name = dsl_utils.sanitize_component_name(
importer_base_name)
if isinstance(artifact_uri, _pipeline_param.PipelineParam):
result.inputs.parameters[
INPUT_KEY].component_input_parameter = artifact_uri.full_name
elif isinstance(artifact_uri, str):
result.inputs.parameters[
INPUT_KEY].runtime_value.constant_value.string_value = artifact_uri
return result
def build_importer_component_spec(
def _build_importer_component_spec(
importer_base_name: str,
input_name: str,
input_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
artifact_type_schema: pipeline_spec_pb2.ArtifactTypeSchema,
) -> pipeline_spec_pb2.ComponentSpec:
"""Builds an importer component spec.
Args:
importer_base_name: The base name of the importer node.
dependent_task: The task requires importer node.
input_name: The name of the input artifact needs to be imported.
input_type_schema: The type of the input artifact.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
Returns:
An importer node component spec.
@ -91,25 +96,61 @@ def build_importer_component_spec(
result = pipeline_spec_pb2.ComponentSpec()
result.executor_label = dsl_utils.sanitize_executor_label(importer_base_name)
result.input_definitions.parameters[
input_name].type = pipeline_spec_pb2.PrimitiveType.STRING
result.output_definitions.artifacts[
OUTPUT_KEY].artifact_type.CopyFrom(input_type_schema)
INPUT_KEY].type = pipeline_spec_pb2.PrimitiveType.STRING
result.output_definitions.artifacts[OUTPUT_KEY].artifact_type.CopyFrom(
artifact_type_schema)
return result
def generate_importer_base_name(dependent_task_name: str,
input_name: str) -> str:
"""Generates the base name of an importer node.
The base name is formed by connecting the dependent task name and the input
artifact name. It's used to form task name, component ref, and executor label.
def importer(artifact_uri: Union[_pipeline_param.PipelineParam, str],
artifact_class: Type[io_types.Artifact],
reimport: bool = False) -> _container_op.ContainerOp:
"""dsl.importer for importing an existing artifact. Only for v2 pipeline.
Args:
dependent_task_name: The name of the task requires importer node.
input_name: The name of the input artifact needs to be imported.
artifact_uri: The artifact uri to import from.
artifact_type_schema: The user specified artifact type schema of the
artifact to be imported.
reimport: Whether to reimport the artifact. Defaults to False.
Returns:
A base importer node name.
A ContainerOp instance.
Raises:
ValueError if the passed in artifact_uri is neither a PipelineParam nor a
constant string value.
"""
return 'importer-{}-{}'.format(dependent_task_name, input_name)
if isinstance(artifact_uri, _pipeline_param.PipelineParam):
input_param = artifact_uri
elif isinstance(artifact_uri, str):
input_param = _pipeline_param.PipelineParam(
name='uri', value=artifact_uri, param_type='String')
else:
raise ValueError(
'Importer got unexpected artifact_uri: {} of type: {}.'.format(
artifact_uri, type(artifact_uri)))
old_warn_value = _container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
task = _container_op.ContainerOp(
name='importer',
image='importer_image', # TODO: need a v1 implementation of importer.
file_outputs={
OUTPUT_KEY: "{{{{$.outputs.artifacts['{}'].uri}}}}".format(OUTPUT_KEY)
},
)
_container_op.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value
artifact_type_schema = type_utils.get_artifact_type_schema(artifact_class)
task.importer_spec = _build_importer_spec(
artifact_uri=artifact_uri, artifact_type_schema=artifact_type_schema)
task.task_spec = _build_importer_task_spec(
importer_base_name=task.name, artifact_uri=artifact_uri)
task.component_spec = _build_importer_component_spec(
importer_base_name=task.name, artifact_type_schema=artifact_type_schema)
task.inputs = [input_param]
return task

View File

@ -12,125 +12,153 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from absl.testing import parameterized
import unittest
from kfp.dsl import _pipeline_param
from kfp.dsl import importer_node
from kfp.pipeline_spec import pipeline_spec_pb2 as pb
from google.protobuf import json_format
class ImporterNodeTest(unittest.TestCase):
class ImporterNodeTest(parameterized.TestCase):
def test_build_importer_task_spec(self):
expected_task = {
'taskInfo': {
'name': 'task-importer-task0-input1'
},
'componentRef': {
'name': 'comp-importer-task0-input1'
},
}
@parameterized.parameters(
{
# artifact_uri is a constant value
'input_uri':
'gs://artifact',
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Dataset'),
'expected_result': {
'artifactUri': {
'constantValue': {
'stringValue': 'gs://artifact'
}
},
'typeSchema': {
'schemaTitle': 'system.Dataset'
}
}
},
{
# artifact_uri is from PipelineParam
'input_uri':
_pipeline_param.PipelineParam(name='uri_to_import'),
'artifact_type_schema':
pb.ArtifactTypeSchema(schema_title='system.Model'),
'expected_result': {
'artifactUri': {
'runtimeParameter': 'uri'
},
'typeSchema': {
'schemaTitle': 'system.Model'
}
},
})
def test_build_importer_spec(self, input_uri, artifact_type_schema,
expected_result):
expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec()
json_format.ParseDict(expected_result, expected_importer_spec)
importer_spec = importer_node._build_importer_spec(
artifact_uri=input_uri, artifact_type_schema=artifact_type_schema)
self.maxDiff = None
self.assertEqual(expected_importer_spec, importer_spec)
@parameterized.parameters(
{
# artifact_uri is a constant value
'importer_name': 'importer-1',
'input_uri': 'gs://artifact',
'expected_result': {
'taskInfo': {
'name': 'task-importer-1'
},
'inputs': {
'parameters': {
'uri': {
'runtimeValue': {
'constantValue': {
'stringValue': 'gs://artifact'
}
}
}
}
},
'componentRef': {
'name': 'comp-importer-1'
},
}
},
{
# artifact_uri is from PipelineParam
'importer_name': 'importer-2',
'input_uri': _pipeline_param.PipelineParam(name='uri_to_import'),
'expected_result': {
'taskInfo': {
'name': 'task-importer-2'
},
'inputs': {
'parameters': {
'uri': {
'componentInputParameter': 'uri_to_import'
}
}
},
'componentRef': {
'name': 'comp-importer-2'
},
},
})
def test_build_importer_task_spec(self, importer_name, input_uri,
expected_result):
expected_task_spec = pb.PipelineTaskSpec()
json_format.ParseDict(expected_task, expected_task_spec)
json_format.ParseDict(expected_result, expected_task_spec)
task_spec = importer_node.build_importer_task_spec(
importer_base_name='importer-task0-input1')
task_spec = importer_node._build_importer_task_spec(
importer_base_name=importer_name, artifact_uri=input_uri)
self.maxDiff = None
self.assertEqual(expected_task_spec, task_spec)
def test_build_importer_spec_from_pipeline_param(self):
expected_importer = {
'artifactUri': {
'runtimeParameter': 'param1'
},
'typeSchema': {
'schemaTitle': 'system.Artifact'
}
}
expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec()
json_format.ParseDict(expected_importer, expected_importer_spec)
importer_spec = importer_node.build_importer_spec(
input_type_schema=pb.ArtifactTypeSchema(schema_title='system.Artifact'),
pipeline_param_name='param1')
self.maxDiff = None
self.assertEqual(expected_importer_spec, importer_spec)
def test_build_importer_spec_from_constant_value(self):
expected_importer = {
'artifactUri': {
'constantValue': {
'stringValue': 'some_uri'
}
},
'typeSchema': {
'schemaTitle': 'system.Artifact'
}
}
expected_importer_spec = pb.PipelineDeploymentConfig.ImporterSpec()
json_format.ParseDict(expected_importer, expected_importer_spec)
importer_spec = importer_node.build_importer_spec(
input_type_schema=pb.ArtifactTypeSchema(schema_title='system.Artifact'),
constant_value='some_uri')
self.maxDiff = None
self.assertEqual(expected_importer_spec, importer_spec)
def test_build_importer_spec_with_invalid_inputs_should_fail(self):
with self.assertRaisesRegex(
AssertionError,
'importer spec should be built using either pipeline_param_name or '
'constant_value'):
importer_node.build_importer_spec(
input_type_schema=pb.ArtifactTypeSchema(
schema_title='system.Artifact'),
pipeline_param_name='param1',
constant_value='some_uri')
with self.assertRaisesRegex(
AssertionError,
'importer spec should be built using either pipeline_param_name or '
'constant_value'):
importer_node.build_importer_spec(
input_type_schema=pb.ArtifactTypeSchema(
schema_title='system.Artifact'))
def test_build_importer_component_spec(self):
expected_importer_component = {
'inputDefinitions': {
'parameters': {
'input1': {
'uri': {
'type': 'STRING'
}
}
},
'outputDefinitions': {
'artifacts': {
'result': {
'artifact': {
'artifactType': {
'schemaTitle': 'system.Artifact'
}
}
}
},
'executorLabel': 'exec-importer-task0-input1'
'executorLabel': 'exec-importer-1'
}
expected_importer_comp_spec = pb.ComponentSpec()
json_format.ParseDict(expected_importer_component,
expected_importer_comp_spec)
importer_comp_spec = importer_node.build_importer_component_spec(
importer_base_name='importer-task0-input1',
input_name='input1',
input_type_schema=pb.ArtifactTypeSchema(schema_title='system.Artifact'))
importer_comp_spec = importer_node._build_importer_component_spec(
importer_base_name='importer-1',
artifact_type_schema=pb.ArtifactTypeSchema(
schema_title='system.Artifact'))
self.maxDiff = None
self.assertEqual(expected_importer_comp_spec, importer_comp_spec)
def test_generate_importer_base_name(self):
self.assertEqual(
'importer-task0-input1',
importer_node.generate_importer_base_name(
dependent_task_name='task0', input_name='input1'))
def test_import_with_invalid_artifact_uri_value_should_fail(self):
from kfp.dsl.io_types import Dataset
with self.assertRaisesRegex(
ValueError,
"Importer got unexpected artifact_uri: 123 of type: <class 'int'>."):
importer_node.importer(artifact_uri=123, artifact_class=Dataset)
if __name__ == '__main__':

View File

@ -19,6 +19,7 @@ from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.dsl import artifact_utils
from kfp.dsl import io_types
# ComponentSpec I/O types to DSL ontology artifact classes mapping.
_ARTIFACT_CLASSES_MAPPING = {
'model': io_types.Model,

View File

@ -34,7 +34,6 @@ from kfp.dsl import _pipeline_param
from kfp.v2.compiler import compiler_utils
from kfp.dsl import component_spec as dsl_component_spec
from kfp.dsl import dsl_utils
from kfp.dsl import importer_node
from kfp.dsl import io_types
from kfp.dsl import type_utils
from kfp.pipeline_spec import pipeline_spec_pb2
@ -796,44 +795,17 @@ class Compiler(object):
raise NotImplementedError(
'dsl.ExitHandler is not yet supported in KFP v2 compiler.')
importer_tasks = []
# Add importer node when applicable
for input_name in subgroup_task_spec.inputs.artifacts:
if not subgroup_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task:
type_schema = type_utils.get_input_artifact_type_schema(
input_name, subgroup._metadata.inputs)
importer_name = importer_node.generate_importer_base_name(
dependent_task_name=subgroup_task_spec.task_info.name,
input_name=input_name)
importer_task_spec = importer_node.build_importer_task_spec(
importer_name)
importer_comp_spec = importer_node.build_importer_component_spec(
importer_base_name=importer_name,
input_name=input_name,
input_type_schema=type_schema)
importer_task_name = importer_task_spec.task_info.name
importer_comp_name = importer_task_spec.component_ref.name
importer_exec_label = importer_comp_spec.executor_label
if isinstance(subgroup, dsl.ContainerOp):
if hasattr(subgroup, 'importer_spec'):
importer_task_name = subgroup.task_spec.task_info.name
importer_comp_name = subgroup.task_spec.component_ref.name
importer_exec_label = subgroup.component_spec.executor_label
group_component_spec.dag.tasks[importer_task_name].CopyFrom(
importer_task_spec)
subgroup.task_spec)
pipeline_spec.components[importer_comp_name].CopyFrom(
importer_comp_spec)
subgroup_task_spec.inputs.artifacts[
input_name].task_output_artifact.producer_task = (
importer_task_name)
subgroup_task_spec.inputs.artifacts[
input_name].task_output_artifact.output_artifact_key = (
importer_node.OUTPUT_KEY)
# Retrieve the pre-built importer spec
importer_spec = subgroup.importer_specs[input_name]
subgroup.component_spec)
deployment_config.executors[importer_exec_label].importer.CopyFrom(
importer_spec)
importer_tasks.append(importer_task_name)
subgroup.importer_spec)
if is_loop_subgroup:
# Retrieve the real parent component, which is the compiler injected
@ -851,7 +823,7 @@ class Compiler(object):
tasks_in_current_dag = [
dsl_utils.sanitize_task_name(subgroup.name) for subgroup in subgroups
] + importer_tasks
]
input_parameters_in_current_dag = [
input_name

View File

@ -258,50 +258,6 @@ class CompilerTest(unittest.TestCase):
pipeline_func=my_pipeline,
package_path='output.json')
def test_compile_pipeline_with_importer_on_inputpath_should_raise_error(self):
# YAML componet authoring
component_op = components.load_component_from_text("""
name: compoent with misused placeholder
inputs:
- {name: model, type: Model}
implementation:
container:
image: dummy
args:
- {inputPath: model}
""")
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(model):
component_op(model=model)
with self.assertRaisesRegex(
TypeError,
'Input "model" with type "Model" is not connected to any upstream '
'output. However it is used with InputPathPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path='output.json')
# Python function based component authoring
def my_component(datasets: components.InputPath('Datasets')):
pass
component_op = components.create_component_from_func(my_component)
@dsl.pipeline(name='test-pipeline', pipeline_root='dummy_root')
def my_pipeline(datasets):
component_op(datasets=datasets)
with self.assertRaisesRegex(
TypeError,
'Input "datasets" with type "Datasets" is not connected to any upstream '
'output. However it is used with InputPathPlaceholder.'):
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path='output.json')
def test_set_pipeline_root_through_pipeline_decorator(self):
tmpdir = tempfile.mkdtemp()

View File

@ -55,14 +55,14 @@ class CompilerCliTests(unittest.TestCase):
finally:
shutil.rmtree(tmpdir)
def test_two_step_pipeline_with_importer(self):
self._test_compile_py_to_json('two_step_pipeline_with_importer')
def test_simple_pipeline_without_importer(self):
def test_two_step_pipeline(self):
self._test_compile_py_to_json(
'simple_pipeline_without_importer',
'two_step_pipeline',
['--pipeline-parameters', '{"text":"Hello KFP!"}'])
def test_pipeline_with_importer(self):
self._test_compile_py_to_json('pipeline_with_importer')
def test_pipeline_with_ontology(self):
self._test_compile_py_to_json('pipeline_with_ontology')

File diff suppressed because one or more lines are too long

View File

@ -0,0 +1,61 @@
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline using dsl.importer."""
from typing import NamedTuple
from kfp import components
from kfp import dsl
from kfp.dsl.io_types import Dataset, Model, InputArtifact
from kfp.v2 import compiler
def train(dataset: InputArtifact(Dataset)) -> NamedTuple(
'Outputs', [
('scalar', str),
('model', Model),
]):
"""Dummy Training step."""
with open(dataset.path, 'r') as f:
data = f.read()
print('Dataset:', data)
scalar = '123'
model = 'My model trained using data: {}'.format(data)
from collections import namedtuple
output = namedtuple('Outputs', ['scalar', 'model'])
return output(scalar, model)
train_op = components.create_component_from_func_v2(train)
@dsl.pipeline(name='pipeline-with-importer', pipeline_root='dummy_root')
def my_pipeline(dataset2: str = 'gs://ml-pipeline-playground/shakespeare2.txt'):
importer = dsl.importer(
artifact_uri='gs://ml-pipeline-playground/shakespeare1.txt',
artifact_class=Dataset,
reimport=False)
train1 = train_op(dataset=importer.output)
with dsl.Condition(train1.outputs['scalar'] == '123'):
importer2 = dsl.importer(
artifact_uri=dataset2, artifact_class=Dataset, reimport=True)
train_op(dataset=importer2.output)
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=__file__.replace('.py', '.json'))

View File

@ -1,213 +1,126 @@
{
"pipelineSpec": {
"schemaVersion": "2.0.0",
"root": {
"dag": {
"tasks": {
"task-importer-task-upstream-input-6": {
"taskInfo": {
"name": "task-importer-task-upstream-input-6"
},
"componentRef": {
"name": "comp-importer-task-upstream-input-6"
"components": {
"comp-upstream": {
"outputDefinitions": {
"parameters": {
"output_1": {
"type": "INT"
}
},
"task-importer-task-upstream-input-8": {
"taskInfo": {
"name": "task-importer-task-upstream-input-8"
},
"componentRef": {
"name": "comp-importer-task-upstream-input-8"
}
},
"task-downstream": {
"componentRef": {
"name": "comp-downstream"
},
"taskInfo": {
"name": "task-downstream"
},
"inputs": {
"artifacts": {
"input_b": {
"taskOutputArtifact": {
"producerTask": "task-upstream",
"outputArtifactKey": "output_2"
}
},
"input_c": {
"taskOutputArtifact": {
"producerTask": "task-upstream",
"outputArtifactKey": "output_3"
}
}
},
"parameters": {
"input_a": {
"taskOutputParameter": {
"producerTask": "task-upstream",
"outputParameterKey": "output_1"
}
}
"artifacts": {
"output_3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"dependentTasks": [
"task-upstream"
]
},
"task-importer-task-upstream-input-4": {
"taskInfo": {
"name": "task-importer-task-upstream-input-4"
},
"componentRef": {
"name": "comp-importer-task-upstream-input-4"
}
},
"task-upstream": {
"inputs": {
"parameters": {
"input_1": {
"componentInputParameter": "input1"
},
"input_2": {
"runtimeValue": {
"constantValue": {
"doubleValue": 3.1415926
}
}
}
},
"artifacts": {
"input_6": {
"taskOutputArtifact": {
"outputArtifactKey": "result",
"producerTask": "task-importer-task-upstream-input-6"
}
},
"input_5": {
"taskOutputArtifact": {
"producerTask": "task-importer-task-upstream-input-5",
"outputArtifactKey": "result"
}
},
"input_4": {
"taskOutputArtifact": {
"outputArtifactKey": "result",
"producerTask": "task-importer-task-upstream-input-4"
}
},
"input_3": {
"taskOutputArtifact": {
"outputArtifactKey": "result",
"producerTask": "task-importer-task-upstream-input-3"
}
},
"input_8": {
"taskOutputArtifact": {
"producerTask": "task-importer-task-upstream-input-8",
"outputArtifactKey": "result"
}
},
"input_7": {
"taskOutputArtifact": {
"outputArtifactKey": "result",
"producerTask": "task-importer-task-upstream-input-7"
}
}
"output_7": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"componentRef": {
"name": "comp-upstream"
"output_2": {
"artifactType": {
"schemaTitle": "system.Model"
}
},
"taskInfo": {
"name": "task-upstream"
"output_6": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"output_5": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"output_4": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
}
},
"executorLabel": "exec-upstream",
"inputDefinitions": {
"parameters": {
"input_2": {
"type": "DOUBLE"
},
"input_1": {
"type": "STRING"
}
},
"task-importer-task-upstream-input-7": {
"taskInfo": {
"name": "task-importer-task-upstream-input-7"
"artifacts": {
"input_4": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"componentRef": {
"name": "comp-importer-task-upstream-input-7"
}
},
"task-importer-task-upstream-input-5": {
"taskInfo": {
"name": "task-importer-task-upstream-input-5"
},
"componentRef": {
"name": "comp-importer-task-upstream-input-5"
}
},
"task-importer-task-upstream-input-3": {
"taskInfo": {
"name": "task-importer-task-upstream-input-3"
},
"componentRef": {
"name": "comp-importer-task-upstream-input-3"
"input_3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
}
},
"inputDefinitions": {
"artifacts": {
"input6": {
"artifactType": {
"schemaTitle": "system.Artifact"
"comp-downstream": {
"executorLabel": "exec-downstream",
"inputDefinitions": {
"parameters": {
"input_a": {
"type": "INT"
}
},
"input8": {
"artifactType": {
"schemaTitle": "system.Artifact"
"artifacts": {
"input_g": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_b": {
"artifactType": {
"schemaTitle": "system.Model"
}
},
"input_c": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_d": {
"artifactType": {
"schemaTitle": "system.Model"
}
},
"input_f": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_e": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
},
"input3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input7": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input5": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input4": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
},
"parameters": {
"input1": {
"type": "STRING"
}
}
}
},
"deploymentSpec": {
"executors": {
"exec-importer-task-upstream-input-3": {
"importer": {
"typeSchema": {
"schemaTitle": "system.Artifact"
},
"artifactUri": {
"runtimeParameter": "input3"
}
}
},
"exec-downstream": {
"container": {
"image": "gcr.io/image",
"args": [
"{{$.inputs.parameters['input_a']}}",
"{{$.inputs.artifacts['input_b'].uri}}",
"{{$.inputs.artifacts['input_c'].path}}"
"{{$.inputs.artifacts['input_c'].path}}",
"{{$.inputs.artifacts['input_d'].uri}}",
"{{$.inputs.artifacts['input_e'].uri}}",
"{{$.inputs.artifacts['input_f'].path}}",
"{{$.inputs.artifacts['input_g'].path}}"
]
}
},
@ -218,293 +131,132 @@
"{{$.inputs.parameters['input_2']}}",
"{{$.inputs.artifacts['input_3'].uri}}",
"{{$.inputs.artifacts['input_4'].uri}}",
"{{$.inputs.artifacts['input_5'].uri}}",
"{{$.inputs.artifacts['input_6'].uri}}",
"{{$.inputs.artifacts['input_7'].uri}}",
"{{$.inputs.artifacts['input_8'].uri}}",
"{{$.outputs.parameters['output_1'].output_file}}",
"{{$.outputs.artifacts['output_2'].uri}}",
"{{$.outputs.artifacts['output_3'].path}}"
"{{$.outputs.artifacts['output_3'].path}}",
"{{$.outputs.artifacts['output_4'].uri}}",
"{{$.outputs.artifacts['output_5'].uri}}",
"{{$.outputs.artifacts['output_6'].path}}",
"{{$.outputs.artifacts['output_7'].path}}"
],
"image": "gcr.io/image"
}
},
"exec-importer-task-upstream-input-4": {
"importer": {
"typeSchema": {
"schemaTitle": "system.Artifact"
},
"artifactUri": {
"runtimeParameter": "input4"
}
}
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.5.0-rc.3",
"root": {
"inputDefinitions": {
"parameters": {
"input1": {
"type": "STRING"
}
},
"exec-importer-task-upstream-input-7": {
"importer": {
"artifactUri": {
"runtimeParameter": "input7"
},
"typeSchema": {
"artifacts": {
"input4": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
},
"exec-importer-task-upstream-input-5": {
"importer": {
"typeSchema": {
"schemaTitle": "system.Model"
},
"artifactUri": {
"constantValue": {
"stringValue": "gs://bucket/metrics"
}
},
"dag": {
"tasks": {
"task-upstream": {
"inputs": {
"parameters": {
"input_2": {
"runtimeValue": {
"constantValue": {
"doubleValue": 3.1415926
}
}
},
"input_1": {
"componentInputParameter": "input1"
}
}
}
}
},
"exec-importer-task-upstream-input-6": {
"importer": {
"typeSchema": {
"schemaTitle": "system.Artifact"
},
"artifactUri": {
"runtimeParameter": "input6"
}
}
},
"exec-importer-task-upstream-input-8": {
"importer": {
"artifactUri": {
"runtimeParameter": "input8"
"taskInfo": {
"name": "task-upstream"
},
"typeSchema": {
"schemaTitle": "system.Artifact"
"componentRef": {
"name": "comp-upstream"
}
},
"task-downstream": {
"inputs": {
"artifacts": {
"input_e": {
"taskOutputArtifact": {
"outputArtifactKey": "output_5",
"producerTask": "task-upstream"
}
},
"input_f": {
"taskOutputArtifact": {
"producerTask": "task-upstream",
"outputArtifactKey": "output_6"
}
},
"input_c": {
"taskOutputArtifact": {
"producerTask": "task-upstream",
"outputArtifactKey": "output_3"
}
},
"input_g": {
"taskOutputArtifact": {
"outputArtifactKey": "output_7",
"producerTask": "task-upstream"
}
},
"input_b": {
"taskOutputArtifact": {
"outputArtifactKey": "output_2",
"producerTask": "task-upstream"
}
},
"input_d": {
"taskOutputArtifact": {
"producerTask": "task-upstream",
"outputArtifactKey": "output_4"
}
}
},
"parameters": {
"input_a": {
"taskOutputParameter": {
"outputParameterKey": "output_1",
"producerTask": "task-upstream"
}
}
}
},
"componentRef": {
"name": "comp-downstream"
},
"taskInfo": {
"name": "task-downstream"
},
"dependentTasks": [
"task-upstream"
]
}
}
}
},
"pipelineInfo": {
"name": "pipeline-with-various-types"
},
"sdkVersion": "kfp-1.5.0-rc.2",
"components": {
"comp-importer-task-upstream-input-7": {
"inputDefinitions": {
"parameters": {
"input_7": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
},
"executorLabel": "exec-importer-task-upstream-input-7"
},
"comp-upstream": {
"inputDefinitions": {
"parameters": {
"input_1": {
"type": "STRING"
},
"input_2": {
"type": "DOUBLE"
}
},
"artifacts": {
"input_8": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_6": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_4": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_5": {
"artifactType": {
"schemaTitle": "system.Model"
}
},
"input_7": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
},
"outputDefinitions": {
"artifacts": {
"output_3": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"output_2": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
},
"parameters": {
"output_1": {
"type": "INT"
}
}
},
"executorLabel": "exec-upstream"
},
"comp-importer-task-upstream-input-4": {
"executorLabel": "exec-importer-task-upstream-input-4",
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
},
"inputDefinitions": {
"parameters": {
"input_4": {
"type": "STRING"
}
}
}
},
"comp-downstream": {
"executorLabel": "exec-downstream",
"inputDefinitions": {
"artifacts": {
"input_c": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
},
"input_b": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
},
"parameters": {
"input_a": {
"type": "INT"
}
}
}
},
"comp-importer-task-upstream-input-5": {
"executorLabel": "exec-importer-task-upstream-input-5",
"inputDefinitions": {
"parameters": {
"input_5": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
}
}
},
"comp-importer-task-upstream-input-6": {
"executorLabel": "exec-importer-task-upstream-input-6",
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
},
"inputDefinitions": {
"parameters": {
"input_6": {
"type": "STRING"
}
}
}
},
"comp-importer-task-upstream-input-3": {
"executorLabel": "exec-importer-task-upstream-input-3",
"inputDefinitions": {
"parameters": {
"input_3": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
}
},
"comp-importer-task-upstream-input-8": {
"executorLabel": "exec-importer-task-upstream-input-8",
"inputDefinitions": {
"parameters": {
"input_8": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
}
}
}
},
"runtimeConfig": {
"parameters": {
"input7": {
"stringValue": "arbitrary value"
},
"input5": {
"stringValue": "gs://bucket/model"
},
"input6": {
"stringValue": "gs://bucket/dataset"
},
"input8": {
"stringValue": "gs://path2"
}
},
"gcsOutputDirectory": "dummy_root"
}
}
}

View File

@ -25,14 +25,14 @@ inputs:
- {name: input_2, type: Float}
- {name: input_3, type: }
- {name: input_4}
- {name: input_5, type: Model}
- {name: input_6, type: Datasets}
- {name: input_7, type: Some arbitrary type}
- {name: input_8, type: {GcsPath: {data_type: TSV}}}
outputs:
- {name: output_1, type: Integer}
- {name: output_2, type: Model}
- {name: output_3}
- {name: output_4, type: Model}
- {name: output_5, type: Datasets}
- {name: output_6, type: Some arbitrary type}
- {name: output_7, type: {GcsPath: {data_type: TSV}}}
implementation:
container:
image: gcr.io/image
@ -41,13 +41,13 @@ implementation:
- {inputValue: input_2}
- {inputUri: input_3}
- {inputUri: input_4}
- {inputUri: input_5}
- {inputUri: input_6}
- {inputUri: input_7}
- {inputUri: input_8}
- {outputPath: output_1}
- {outputUri: output_2}
- {outputPath: output_3}
- {outputUri: output_4}
- {outputUri: output_5}
- {outputPath: output_6}
- {outputPath: output_7}
""")
component_op_2 = components.load_component_from_text("""
@ -56,6 +56,10 @@ inputs:
- {name: input_a, type: Integer}
- {name: input_b, type: Model}
- {name: input_c}
- {name: input_d, type: Model}
- {name: input_e, type: Datasets}
- {name: input_f, type: Some arbitrary type}
- {name: input_g, type: {GcsPath: {data_type: TSV}}}
implementation:
container:
image: gcr.io/image
@ -63,30 +67,32 @@ implementation:
- {inputValue: input_a}
- {inputUri: input_b}
- {inputPath: input_c}
- {inputUri: input_d}
- {inputUri: input_e}
- {inputPath: input_f}
- {inputPath: input_g}
""")
@dsl.pipeline(name='pipeline-with-various-types', pipeline_root='dummy_root')
def my_pipeline(input1: str,
input3,
input4='',
input5='gs://bucket/model',
input6='gs://bucket/dataset',
input7='arbitrary value',
input8='gs://path2'):
input4=''):
component_1 = component_op_1(
input_1=input1,
input_2=3.1415926,
input_3=input3,
input_4=input4,
input_5='gs://bucket/metrics',
input_6=input6,
input_7=input7,
input_8=input8)
)
component_2 = component_op_2(
input_a=component_1.outputs['output_1'],
input_b=component_1.outputs['output_2'],
input_c=component_1.outputs['output_3'])
input_c=component_1.outputs['output_3'],
input_d=component_1.outputs['output_4'],
input_e=component_1.outputs['output_5'],
input_f=component_1.outputs['output_6'],
input_g=component_1.outputs['output_7'],
)
if __name__ == '__main__':

View File

@ -1,223 +0,0 @@
{
"pipelineSpec": {
"sdkVersion": "kfp-1.3.0",
"root": {
"dag": {
"tasks": {
"task-trainer": {
"componentRef": {
"name": "comp-trainer"
},
"taskInfo": {
"name": "task-trainer"
},
"inputs": {
"parameters": {
"num_epochs": {
"componentInputParameter": "epochs"
},
"train_optimizer": {
"componentInputParameter": "optimizer"
}
},
"artifacts": {
"input_location": {
"taskOutputArtifact": {
"producerTask": "task-importer-task-trainer-input-location",
"outputArtifactKey": "result"
}
}
}
}
},
"task-serving": {
"dependentTasks": [
"task-trainer"
],
"taskInfo": {
"name": "task-serving"
},
"componentRef": {
"name": "comp-serving"
},
"inputs": {
"parameters": {
"model_cfg": {
"taskOutputParameter": {
"producerTask": "task-trainer",
"outputParameterKey": "model_config"
}
}
},
"artifacts": {
"model": {
"taskOutputArtifact": {
"producerTask": "task-trainer",
"outputArtifactKey": "model_output"
}
}
}
}
},
"task-importer-task-trainer-input-location": {
"taskInfo": {
"name": "task-importer-task-trainer-input-location"
},
"componentRef": {
"name": "comp-importer-task-trainer-input-location"
}
}
}
},
"inputDefinitions": {
"artifacts": {
"input_gcs": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
},
"parameters": {
"epochs": {
"type": "INT"
},
"optimizer": {
"type": "STRING"
}
}
}
},
"pipelineInfo": {
"name": "two-step-pipeline-with-importer"
},
"deploymentSpec": {
"executors": {
"exec-importer-task-trainer-input-location": {
"importer": {
"artifactUri": {
"runtimeParameter": "input_gcs"
},
"typeSchema": {
"schemaTitle": "system.Artifact"
}
}
},
"exec-serving": {
"container": {
"args": [
"--model-to-serve",
"{{$.inputs.artifacts['model'].uri}}",
"--model-config",
"{{$.inputs.parameters['model_cfg']}}"
],
"image": "gcr.io/my-project/my-server"
}
},
"exec-trainer": {
"container": {
"image": "gcr.io/my-project/my-training",
"args": [
"--input-location",
"{{$.inputs.artifacts['input_location'].uri}}",
"--optimizer",
"{{$.inputs.parameters['train_optimizer']}}",
"--epochs",
"{{$.inputs.parameters['num_epochs']}}",
"--model",
"{{$.outputs.artifacts['model_output'].uri}}",
"--model-config",
"{{$.outputs.parameters['model_config'].output_file}}"
]
}
}
}
},
"components": {
"comp-serving": {
"inputDefinitions": {
"parameters": {
"model_cfg": {
"type": "STRING"
}
},
"artifacts": {
"model": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
}
},
"executorLabel": "exec-serving"
},
"comp-trainer": {
"executorLabel": "exec-trainer",
"outputDefinitions": {
"artifacts": {
"model_output": {
"artifactType": {
"schemaTitle": "system.Model"
}
}
},
"parameters": {
"model_config": {
"type": "STRING"
}
}
},
"inputDefinitions": {
"parameters": {
"num_epochs": {
"type": "INT"
},
"train_optimizer": {
"type": "STRING"
}
},
"artifacts": {
"input_location": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
}
},
"comp-importer-task-trainer-input-location": {
"executorLabel": "exec-importer-task-trainer-input-location",
"inputDefinitions": {
"parameters": {
"input_location": {
"type": "STRING"
}
}
},
"outputDefinitions": {
"artifacts": {
"result": {
"artifactType": {
"schemaTitle": "system.Artifact"
}
}
}
}
}
},
"schemaVersion": "2.0.0"
},
"runtimeConfig": {
"parameters": {
"epochs": {
"intValue": "200"
},
"optimizer": {
"stringValue": "sgd"
},
"input_gcs": {
"stringValue": "gs://test-bucket/pipeline_root"
}
},
"gcsOutputDirectory": "dummy_root"
}
}

View File

@ -1,45 +0,0 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import pathlib
from kfp import components
from kfp.v2 import dsl
import kfp.v2.compiler as compiler
test_data_dir = pathlib.Path(__file__).parent / 'component_yaml'
trainer_op = components.load_component_from_file(
str(test_data_dir / 'trainer_component.yaml'))
serving_op = components.load_component_from_file(
str(test_data_dir / 'serving_component.yaml'))
@dsl.pipeline(
name='two-step-pipeline-with-importer',
pipeline_root='dummy_root',
description='A linear two-step pipeline.')
def my_pipeline(input_gcs = 'gs://test-bucket/pipeline_root',
optimizer: str = 'sgd',
epochs: int = 200):
trainer = trainer_op(
input_location=input_gcs, train_optimizer=optimizer, num_epochs=epochs)
serving = serving_op(
model=trainer.outputs['model_output'],
model_cfg=trainer.outputs['model_config'])
if __name__ == '__main__':
compiler.Compiler().compile(
pipeline_func=my_pipeline,
package_path=__file__.replace('.py', '.json'))