From 0fc68bbdd45f801e6add34c46435c4ef9566790a Mon Sep 17 00:00:00 2001 From: Alexey Volkov Date: Wed, 28 Aug 2019 21:09:57 -0700 Subject: [PATCH] SDK - Added support for raw input artifact argument values to ContainerOp (#791) * SDK - Added support for raw artifact values to ContainerOp * `ContainerOp` now gets artifact artguments from command line instead of the constructor. * Added back input_artifact_arguments to the ContainerOp constructor. In some scenarios it's hard to provide the artifact arguments through the `command` list when it already has resolved artifact paths. * Exporting InputArtifactArgument from kfp.dsl * Updated the sample * Properly passing artifact arguments as task arguments as opposed to default input values. * Renamed input_artifact_arguments to artifact_arguments to reduce confusion * Renamed InputArtifactArgument to InputArgumentPath Also renamed input_artifact_arguments to artifact_argument_paths in the ContainerOp's constructor * Replaced getattr with isinstance checks. getattr is too fragile and can be broken by renames. * Fixed the type annotations * Unlocked the input artifact support in components Added the test_input_path_placeholder_with_constant_argument test --- sdk/python/kfp/compiler/_op_to_template.py | 24 +++- sdk/python/kfp/compiler/compiler.py | 12 ++ sdk/python/kfp/components/_dsl_bridge.py | 16 ++- sdk/python/kfp/dsl/__init__.py | 2 +- sdk/python/kfp/dsl/_container_op.py | 38 ++++++ sdk/python/tests/compiler/compiler_tests.py | 4 + .../testdata/input_artifact_raw_value.py | 69 +++++++++++ .../testdata/input_artifact_raw_value.txt | 1 + .../testdata/input_artifact_raw_value.yaml | 115 ++++++++++++++++++ .../tests/components/test_components.py | 17 +++ 10 files changed, 289 insertions(+), 9 deletions(-) create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.py create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt create mode 100644 sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml diff --git a/sdk/python/kfp/compiler/_op_to_template.py b/sdk/python/kfp/compiler/_op_to_template.py index 52941f6476..ab8a8417f4 100644 --- a/sdk/python/kfp/compiler/_op_to_template.py +++ b/sdk/python/kfp/compiler/_op_to_template.py @@ -127,11 +127,26 @@ def _parameters_to_json(params: List[dsl.PipelineParam]): return params -# TODO: artifacts? -def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None): +def _inputs_to_json( + inputs_params: List[dsl.PipelineParam], + input_artifact_paths: Dict[str, str] = None, +) -> Dict[str, Dict]: """Converts a list of PipelineParam into an argo `inputs` JSON obj.""" parameters = _parameters_to_json(inputs_params) - return {'parameters': parameters} if parameters else None + + # Building the input artifacts section + artifacts = [] + for name, path in (input_artifact_paths or {}).items(): + artifact = {'name': name, 'path': path} + artifacts.append(artifact) + artifacts.sort(key=lambda x: x['name']) #Stabilizing the input artifact ordering + + inputs_dict = {} + if parameters: + inputs_dict['parameters'] = parameters + if artifacts: + inputs_dict['artifacts'] = artifacts + return inputs_dict def _outputs_to_json(op: BaseOp, @@ -213,7 +228,8 @@ def _op_to_template(op: BaseOp): } # inputs - inputs = _inputs_to_json(processed_op.inputs) + input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None + inputs = _inputs_to_json(processed_op.inputs, input_artifact_paths) if inputs: template['inputs'] = inputs diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 90eca7a021..83c295b010 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -515,6 +515,18 @@ class Compiler(object): }) arguments.sort(key=lambda x: x['name']) task['arguments'] = {'parameters': arguments} + + if isinstance(sub_group, dsl.ContainerOp) and sub_group.artifact_arguments: + artifact_argument_structs = [] + for input_name, argument in sub_group.artifact_arguments.items(): + artifact_argument_dict = {'name': input_name} + if isinstance(argument, str): + artifact_argument_dict['raw'] = {'data': str(argument)} + else: + raise TypeError('Argument "{}" was passed to the artifact input "{}", but only constant strings are supported at this moment.'.format(str(argument), input_name)) + artifact_argument_structs.append(artifact_argument_dict) + task.setdefault('arguments', {})['artifacts'] = artifact_argument_structs + tasks.append(task) tasks.sort(key=lambda x: x['name']) template['dag'] = {'tasks': tasks} diff --git a/sdk/python/kfp/components/_dsl_bridge.py b/sdk/python/kfp/components/_dsl_bridge.py index 750789f95d..f0e234e9ec 100644 --- a/sdk/python/kfp/components/_dsl_bridge.py +++ b/sdk/python/kfp/components/_dsl_bridge.py @@ -15,7 +15,7 @@ from collections import OrderedDict from typing import Mapping from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec -from ._components import _generate_output_file_name, _default_component_name +from ._components import _generate_input_file_name, _generate_output_file_name, _default_component_name from kfp.dsl._metadata import ComponentMeta, ParameterMeta def create_container_op_from_task(task_spec: TaskSpec): @@ -34,6 +34,9 @@ def create_container_op_from_task(task_spec: TaskSpec): if output.name in unconfigurable_output_paths: output_paths[output.name] = unconfigurable_output_paths[output.name] + input_paths = OrderedDict() + artifact_arguments = OrderedDict() + def expand_command_part(arg): #input values with original names #(Union[str,Mapping[str, Any]]) -> Union[str,List[str]] if arg is None: @@ -57,8 +60,10 @@ def create_container_op_from_task(task_spec: TaskSpec): input_name = arg.input_name input_value = argument_values.get(input_name, None) if input_value is not None: - raise ValueError('ContainerOp does not support input artifacts - input {}'.format(input_name)) - #return input_value + input_path = _generate_input_file_name(input_name) + input_paths[input_name] = input_path + artifact_arguments[input_name] = input_value + return input_path else: input_spec = inputs_dict[input_name] if input_spec.optional: @@ -123,13 +128,15 @@ def create_container_op_from_task(task_spec: TaskSpec): container_image=container_spec.image, command=expanded_command, arguments=expanded_args, + input_paths=input_paths, output_paths=output_paths, + artifact_arguments=artifact_arguments, env=container_spec.env, component_spec=component_spec, ) -def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None): +def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, input_paths=None, artifact_arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None): from .. import dsl #Renaming outputs to conform with ContainerOp/Argo @@ -154,6 +161,7 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma command=command, arguments=arguments, file_outputs=output_paths_for_container_op, + artifact_argument_paths=[dsl.InputArgumentPath(argument=artifact_arguments[input_name], input=input_name, path=path) for input_name, path in input_paths.items()], ) task._set_metadata(component_meta) diff --git a/sdk/python/kfp/dsl/__init__.py b/sdk/python/kfp/dsl/__init__.py index 487feccc9f..fb12902583 100644 --- a/sdk/python/kfp/dsl/__init__.py +++ b/sdk/python/kfp/dsl/__init__.py @@ -15,7 +15,7 @@ from ._pipeline_param import PipelineParam, match_serialized_pipelineparam from ._pipeline import Pipeline, pipeline, get_pipeline_conf -from ._container_op import ContainerOp, UserContainer, Sidecar +from ._container_op import ContainerOp, InputArgumentPath, UserContainer, Sidecar from ._resource_op import ResourceOp from ._volume_op import ( VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index 87f8287211..87ca12f923 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -903,6 +903,13 @@ class BaseOp(object): from ._pipeline_volume import PipelineVolume # The import is here to prevent circular reference problems. +class InputArgumentPath: + def __init__(self, argument, input=None, path=None): + self.argument = argument + self.input = input + self.path = path + + class ContainerOp(BaseOp): """ Represents an op implemented by a container image. @@ -961,6 +968,7 @@ class ContainerOp(BaseOp): init_containers: List[UserContainer] = None, sidecars: List[Sidecar] = None, container_kwargs: Dict = None, + artifact_argument_paths: List[InputArgumentPath] = None, file_outputs: Dict[str, str] = None, output_artifact_paths : Dict[str, str]=None, artifact_location: V1alpha1ArtifactLocation=None, @@ -984,6 +992,10 @@ class ContainerOp(BaseOp): together with the `main` container. container_kwargs: the dict of additional keyword arguments to pass to the op's `Container` definition. + artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed. + At pipeline run time, the value of the artifact argument is saved to a local file with specified path. + This parameter is only needed when the input file paths are hard-coded in the program. + Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances. file_outputs: Maps output labels to local file paths. At pipeline run time, the value of a PipelineParam is saved to its corresponding local file. It's one way for outside world to receive outputs of the container. @@ -1003,6 +1015,30 @@ class ContainerOp(BaseOp): super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler) self.attrs_with_pipelineparams = BaseOp.attrs_with_pipelineparams + ['_container', 'artifact_location'] #Copying the BaseOp class variable! + input_artifact_paths = {} + artifact_arguments = {} + + def resolve_artifact_argument(artarg): + from ..components._components import _generate_input_file_name + if not isinstance(artarg, InputArgumentPath): + return artarg + input_name = getattr(artarg.input, 'name', artarg.input) or ('input-' + str(len(artifact_arguments))) + input_path = artarg.path or _generate_input_file_name(input_name) + input_artifact_paths[input_name] = input_path + if not isinstance(artarg.argument, str): + raise TypeError('Argument "{}" was passed to the artifact input "{}", but only constant strings are supported at this moment.'.format(str(artarg.argument), input_name)) + + artifact_arguments[input_name] = artarg.argument + return input_path + + for artarg in artifact_argument_paths or []: + resolve_artifact_argument(artarg) + + if isinstance(command, Sequence) and not isinstance(command, str): + command = list(map(resolve_artifact_argument, command)) + if isinstance(arguments, Sequence) and not isinstance(arguments, str): + arguments = list(map(resolve_artifact_argument, arguments)) + # convert to list if not a list command = as_string_list(command) arguments = as_string_list(arguments) @@ -1041,6 +1077,8 @@ class ContainerOp(BaseOp): setattr(self, attr_to_proxy, _proxy(attr_to_proxy)) # attributes specific to `ContainerOp` + self.input_artifact_paths = input_artifact_paths + self.artifact_arguments = artifact_arguments self.file_outputs = file_outputs self.output_artifact_paths = output_artifact_paths or {} self.artifact_location = artifact_location diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index 09f02794f1..4f53e1149d 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -702,3 +702,7 @@ implementation: self.assertIsNone(delete_op_template.get("successCondition")) self.assertIsNone(delete_op_template.get("failureCondition")) self.assertDictEqual(delete_op_template.get("outputs"), {}) + + def test_py_input_artifact_raw_value(self): + """Test pipeline input_artifact_raw_value.""" + self._test_py_compile_yaml('input_artifact_raw_value') diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py new file mode 100644 index 0000000000..c084cffa8a --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.py @@ -0,0 +1,69 @@ +#!/usr/bin/env python3 +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import sys +from pathlib import Path + +sys.path.insert(0, __file__ + '/../../../../') + +import kfp +from kfp import dsl + + +def component_with_inline_input_artifact(text: str): + return dsl.ContainerOp( + name='component_with_inline_input_artifact', + image='alpine', + command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional + ) + + +def component_with_input_artifact(text): + '''A component that passes text as input artifact''' + + return dsl.ContainerOp( + name='component_with_input_artifact', + artifact_argument_paths=[ + dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional + ], + image='alpine', + command=['cat', '/tmp/inputs/text/data'], + ) + +def component_with_hardcoded_input_artifact_value(): + '''A component that passes hard-coded text as input artifact''' + return component_with_input_artifact('hard-coded artifact value') + + +def component_with_input_artifact_value_from_file(file_path): + '''A component that passes contents of a file as input artifact''' + return component_with_input_artifact(Path(file_path).read_text()) + + +@dsl.pipeline( + name='Pipeline with artifact input raw argument value.', + description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.' +) +def input_artifact_pipeline(): + component_with_inline_input_artifact('Constant artifact value') + component_with_input_artifact('Constant artifact value') + component_with_hardcoded_input_artifact_value() + + file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt')) + component_with_input_artifact_value_from_file(file_path) + +if __name__ == '__main__': + kfp.compiler.Compiler().compile(input_artifact_pipeline, __file__ + '.yaml') diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt new file mode 100644 index 0000000000..945a9d5372 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.txt @@ -0,0 +1 @@ +Text from a file with hard-coded artifact value \ No newline at end of file diff --git a/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml new file mode 100644 index 0000000000..e66fbdd7f9 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/input_artifact_raw_value.yaml @@ -0,0 +1,115 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with artifact input raw argument value."}' + generateName: pipeline-with-artifact-input-raw-argument-value- +spec: + arguments: + parameters: [] + entrypoint: pipeline-with-artifact-input-raw-argument-value + serviceAccountName: pipeline-runner + templates: + - container: + command: + - cat + - /tmp/inputs/text/data + image: alpine + inputs: + artifacts: + - name: text + path: /tmp/inputs/text/data + name: component-with-inline-input-artifact + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + command: + - cat + - /tmp/inputs/text/data + image: alpine + inputs: + artifacts: + - name: text + path: /tmp/inputs/text/data + name: component-with-input-artifact + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + command: + - cat + - /tmp/inputs/text/data + image: alpine + inputs: + artifacts: + - name: text + path: /tmp/inputs/text/data + name: component-with-input-artifact-2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - container: + command: + - cat + - /tmp/inputs/text/data + image: alpine + inputs: + artifacts: + - name: text + path: /tmp/inputs/text/data + name: component-with-input-artifact-3 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + optional: true + path: /mlpipeline-ui-metadata.json + - name: mlpipeline-metrics + optional: true + path: /mlpipeline-metrics.json + - dag: + tasks: + - arguments: + artifacts: + - name: text + raw: + data: Constant artifact value + name: component-with-inline-input-artifact + template: component-with-inline-input-artifact + - arguments: + artifacts: + - name: text + raw: + data: Constant artifact value + name: component-with-input-artifact + template: component-with-input-artifact + - arguments: + artifacts: + - name: text + raw: + data: hard-coded artifact value + name: component-with-input-artifact-2 + template: component-with-input-artifact-2 + - arguments: + artifacts: + - name: text + raw: + data: Text from a file with hard-coded artifact value + name: component-with-input-artifact-3 + template: component-with-input-artifact-3 + name: pipeline-with-artifact-input-raw-argument-value diff --git a/sdk/python/tests/components/test_components.py b/sdk/python/tests/components/test_components.py index 2c621a000a..6d8a330ed6 100644 --- a/sdk/python/tests/components/test_components.py +++ b/sdk/python/tests/components/test_components.py @@ -270,6 +270,23 @@ implementation: self.assertEqual(task1.arguments[0], '--output-data') self.assertTrue(task1.arguments[1].startswith('/')) + def test_input_path_placeholder_with_constant_argument(self): + component_text = '''\ +inputs: +- {name: input 1} +implementation: + container: + image: busybox + command: + - --input-data + - {inputPath: input 1} +''' + task_factory1 = comp.load_component_from_text(component_text) + task1 = task_factory1('Text') + + self.assertEqual(task1.command, ['--input-data', task1.input_artifact_paths['input 1']]) + self.assertEqual(task1.artifact_arguments, {'input 1': 'Text'}) + def test_optional_inputs_reordering(self): '''Tests optional input reordering. In python signature, optional arguments must come after the required arguments.