SDK - Added support for raw input artifact argument values to ContainerOp (#791)

* SDK - Added support for raw artifact values to ContainerOp

* `ContainerOp` now gets artifact artguments from command line instead of the constructor.

* Added back input_artifact_arguments to the ContainerOp constructor.
In some scenarios it's hard to provide the artifact arguments through the `command` list when it already has resolved artifact paths.

* Exporting InputArtifactArgument from kfp.dsl

* Updated the sample

* Properly passing artifact arguments as task arguments
as opposed to default input values.

* Renamed input_artifact_arguments to artifact_arguments to reduce confusion

* Renamed InputArtifactArgument to InputArgumentPath
Also renamed input_artifact_arguments to artifact_argument_paths in the ContainerOp's constructor

* Replaced getattr with isinstance checks.
getattr is too fragile and can be broken by renames.

* Fixed the type annotations

* Unlocked the input artifact support in components
Added the test_input_path_placeholder_with_constant_argument test
This commit is contained in:
Alexey Volkov 2019-08-28 21:09:57 -07:00 committed by Kubernetes Prow Robot
parent 6827a2c977
commit 0fc68bbdd4
10 changed files with 289 additions and 9 deletions

View File

@ -127,11 +127,26 @@ def _parameters_to_json(params: List[dsl.PipelineParam]):
return params
# TODO: artifacts?
def _inputs_to_json(inputs_params: List[dsl.PipelineParam], _artifacts=None):
def _inputs_to_json(
inputs_params: List[dsl.PipelineParam],
input_artifact_paths: Dict[str, str] = None,
) -> Dict[str, Dict]:
"""Converts a list of PipelineParam into an argo `inputs` JSON obj."""
parameters = _parameters_to_json(inputs_params)
return {'parameters': parameters} if parameters else None
# Building the input artifacts section
artifacts = []
for name, path in (input_artifact_paths or {}).items():
artifact = {'name': name, 'path': path}
artifacts.append(artifact)
artifacts.sort(key=lambda x: x['name']) #Stabilizing the input artifact ordering
inputs_dict = {}
if parameters:
inputs_dict['parameters'] = parameters
if artifacts:
inputs_dict['artifacts'] = artifacts
return inputs_dict
def _outputs_to_json(op: BaseOp,
@ -213,7 +228,8 @@ def _op_to_template(op: BaseOp):
}
# inputs
inputs = _inputs_to_json(processed_op.inputs)
input_artifact_paths = processed_op.input_artifact_paths if isinstance(processed_op, dsl.ContainerOp) else None
inputs = _inputs_to_json(processed_op.inputs, input_artifact_paths)
if inputs:
template['inputs'] = inputs

View File

@ -515,6 +515,18 @@ class Compiler(object):
})
arguments.sort(key=lambda x: x['name'])
task['arguments'] = {'parameters': arguments}
if isinstance(sub_group, dsl.ContainerOp) and sub_group.artifact_arguments:
artifact_argument_structs = []
for input_name, argument in sub_group.artifact_arguments.items():
artifact_argument_dict = {'name': input_name}
if isinstance(argument, str):
artifact_argument_dict['raw'] = {'data': str(argument)}
else:
raise TypeError('Argument "{}" was passed to the artifact input "{}", but only constant strings are supported at this moment.'.format(str(argument), input_name))
artifact_argument_structs.append(artifact_argument_dict)
task.setdefault('arguments', {})['artifacts'] = artifact_argument_structs
tasks.append(task)
tasks.sort(key=lambda x: x['name'])
template['dag'] = {'tasks': tasks}

View File

@ -15,7 +15,7 @@
from collections import OrderedDict
from typing import Mapping
from ._structures import ContainerImplementation, ConcatPlaceholder, IfPlaceholder, InputValuePlaceholder, InputPathPlaceholder, IsPresentPlaceholder, OutputPathPlaceholder, TaskSpec
from ._components import _generate_output_file_name, _default_component_name
from ._components import _generate_input_file_name, _generate_output_file_name, _default_component_name
from kfp.dsl._metadata import ComponentMeta, ParameterMeta
def create_container_op_from_task(task_spec: TaskSpec):
@ -34,6 +34,9 @@ def create_container_op_from_task(task_spec: TaskSpec):
if output.name in unconfigurable_output_paths:
output_paths[output.name] = unconfigurable_output_paths[output.name]
input_paths = OrderedDict()
artifact_arguments = OrderedDict()
def expand_command_part(arg): #input values with original names
#(Union[str,Mapping[str, Any]]) -> Union[str,List[str]]
if arg is None:
@ -57,8 +60,10 @@ def create_container_op_from_task(task_spec: TaskSpec):
input_name = arg.input_name
input_value = argument_values.get(input_name, None)
if input_value is not None:
raise ValueError('ContainerOp does not support input artifacts - input {}'.format(input_name))
#return input_value
input_path = _generate_input_file_name(input_name)
input_paths[input_name] = input_path
artifact_arguments[input_name] = input_value
return input_path
else:
input_spec = inputs_dict[input_name]
if input_spec.optional:
@ -123,13 +128,15 @@ def create_container_op_from_task(task_spec: TaskSpec):
container_image=container_spec.image,
command=expanded_command,
arguments=expanded_args,
input_paths=input_paths,
output_paths=output_paths,
artifact_arguments=artifact_arguments,
env=container_spec.env,
component_spec=component_spec,
)
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None):
def _create_container_op_from_resolved_task(name:str, container_image:str, command=None, arguments=None, input_paths=None, artifact_arguments=None, output_paths=None, env : Mapping[str, str]=None, component_spec=None):
from .. import dsl
#Renaming outputs to conform with ContainerOp/Argo
@ -154,6 +161,7 @@ def _create_container_op_from_resolved_task(name:str, container_image:str, comma
command=command,
arguments=arguments,
file_outputs=output_paths_for_container_op,
artifact_argument_paths=[dsl.InputArgumentPath(argument=artifact_arguments[input_name], input=input_name, path=path) for input_name, path in input_paths.items()],
)
task._set_metadata(component_meta)

View File

@ -15,7 +15,7 @@
from ._pipeline_param import PipelineParam, match_serialized_pipelineparam
from ._pipeline import Pipeline, pipeline, get_pipeline_conf
from ._container_op import ContainerOp, UserContainer, Sidecar
from ._container_op import ContainerOp, InputArgumentPath, UserContainer, Sidecar
from ._resource_op import ResourceOp
from ._volume_op import (
VolumeOp, VOLUME_MODE_RWO, VOLUME_MODE_RWM, VOLUME_MODE_ROM

View File

@ -903,6 +903,13 @@ class BaseOp(object):
from ._pipeline_volume import PipelineVolume # The import is here to prevent circular reference problems.
class InputArgumentPath:
def __init__(self, argument, input=None, path=None):
self.argument = argument
self.input = input
self.path = path
class ContainerOp(BaseOp):
"""
Represents an op implemented by a container image.
@ -961,6 +968,7 @@ class ContainerOp(BaseOp):
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
container_kwargs: Dict = None,
artifact_argument_paths: List[InputArgumentPath] = None,
file_outputs: Dict[str, str] = None,
output_artifact_paths : Dict[str, str]=None,
artifact_location: V1alpha1ArtifactLocation=None,
@ -984,6 +992,10 @@ class ContainerOp(BaseOp):
together with the `main` container.
container_kwargs: the dict of additional keyword arguments to pass to the
op's `Container` definition.
artifact_argument_paths: Optional. Maps input artifact arguments (values or references) to the local file paths where they'll be placed.
At pipeline run time, the value of the artifact argument is saved to a local file with specified path.
This parameter is only needed when the input file paths are hard-coded in the program.
Otherwise it's better to pass input artifact placement paths by including artifact arguments in the command-line using the InputArgumentPath class instances.
file_outputs: Maps output labels to local file paths. At pipeline run time,
the value of a PipelineParam is saved to its corresponding local file. It's
one way for outside world to receive outputs of the container.
@ -1003,6 +1015,30 @@ class ContainerOp(BaseOp):
super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler)
self.attrs_with_pipelineparams = BaseOp.attrs_with_pipelineparams + ['_container', 'artifact_location'] #Copying the BaseOp class variable!
input_artifact_paths = {}
artifact_arguments = {}
def resolve_artifact_argument(artarg):
from ..components._components import _generate_input_file_name
if not isinstance(artarg, InputArgumentPath):
return artarg
input_name = getattr(artarg.input, 'name', artarg.input) or ('input-' + str(len(artifact_arguments)))
input_path = artarg.path or _generate_input_file_name(input_name)
input_artifact_paths[input_name] = input_path
if not isinstance(artarg.argument, str):
raise TypeError('Argument "{}" was passed to the artifact input "{}", but only constant strings are supported at this moment.'.format(str(artarg.argument), input_name))
artifact_arguments[input_name] = artarg.argument
return input_path
for artarg in artifact_argument_paths or []:
resolve_artifact_argument(artarg)
if isinstance(command, Sequence) and not isinstance(command, str):
command = list(map(resolve_artifact_argument, command))
if isinstance(arguments, Sequence) and not isinstance(arguments, str):
arguments = list(map(resolve_artifact_argument, arguments))
# convert to list if not a list
command = as_string_list(command)
arguments = as_string_list(arguments)
@ -1041,6 +1077,8 @@ class ContainerOp(BaseOp):
setattr(self, attr_to_proxy, _proxy(attr_to_proxy))
# attributes specific to `ContainerOp`
self.input_artifact_paths = input_artifact_paths
self.artifact_arguments = artifact_arguments
self.file_outputs = file_outputs
self.output_artifact_paths = output_artifact_paths or {}
self.artifact_location = artifact_location

View File

@ -702,3 +702,7 @@ implementation:
self.assertIsNone(delete_op_template.get("successCondition"))
self.assertIsNone(delete_op_template.get("failureCondition"))
self.assertDictEqual(delete_op_template.get("outputs"), {})
def test_py_input_artifact_raw_value(self):
"""Test pipeline input_artifact_raw_value."""
self._test_py_compile_yaml('input_artifact_raw_value')

View File

@ -0,0 +1,69 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import sys
from pathlib import Path
sys.path.insert(0, __file__ + '/../../../../')
import kfp
from kfp import dsl
def component_with_inline_input_artifact(text: str):
return dsl.ContainerOp(
name='component_with_inline_input_artifact',
image='alpine',
command=['cat', dsl.InputArgumentPath(text, path='/tmp/inputs/text/data', input='text')], # path and input are optional
)
def component_with_input_artifact(text):
'''A component that passes text as input artifact'''
return dsl.ContainerOp(
name='component_with_input_artifact',
artifact_argument_paths=[
dsl.InputArgumentPath(argument=text, path='/tmp/inputs/text/data', input='text'), # path and input are optional
],
image='alpine',
command=['cat', '/tmp/inputs/text/data'],
)
def component_with_hardcoded_input_artifact_value():
'''A component that passes hard-coded text as input artifact'''
return component_with_input_artifact('hard-coded artifact value')
def component_with_input_artifact_value_from_file(file_path):
'''A component that passes contents of a file as input artifact'''
return component_with_input_artifact(Path(file_path).read_text())
@dsl.pipeline(
name='Pipeline with artifact input raw argument value.',
description='Pipeline shows how to define artifact inputs and pass raw artifacts to them.'
)
def input_artifact_pipeline():
component_with_inline_input_artifact('Constant artifact value')
component_with_input_artifact('Constant artifact value')
component_with_hardcoded_input_artifact_value()
file_path = str(Path(__file__).parent.joinpath('input_artifact_raw_value.txt'))
component_with_input_artifact_value_from_file(file_path)
if __name__ == '__main__':
kfp.compiler.Compiler().compile(input_artifact_pipeline, __file__ + '.yaml')

View File

@ -0,0 +1 @@
Text from a file with hard-coded artifact value

View File

@ -0,0 +1,115 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
annotations:
pipelines.kubeflow.org/pipeline_spec: '{"description": "Pipeline shows how to define artifact inputs and pass raw artifacts to them.", "name": "Pipeline with artifact input raw argument value."}'
generateName: pipeline-with-artifact-input-raw-argument-value-
spec:
arguments:
parameters: []
entrypoint: pipeline-with-artifact-input-raw-argument-value
serviceAccountName: pipeline-runner
templates:
- container:
command:
- cat
- /tmp/inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /tmp/inputs/text/data
name: component-with-inline-input-artifact
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
command:
- cat
- /tmp/inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /tmp/inputs/text/data
name: component-with-input-artifact
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
command:
- cat
- /tmp/inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /tmp/inputs/text/data
name: component-with-input-artifact-2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
command:
- cat
- /tmp/inputs/text/data
image: alpine
inputs:
artifacts:
- name: text
path: /tmp/inputs/text/data
name: component-with-input-artifact-3
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- dag:
tasks:
- arguments:
artifacts:
- name: text
raw:
data: Constant artifact value
name: component-with-inline-input-artifact
template: component-with-inline-input-artifact
- arguments:
artifacts:
- name: text
raw:
data: Constant artifact value
name: component-with-input-artifact
template: component-with-input-artifact
- arguments:
artifacts:
- name: text
raw:
data: hard-coded artifact value
name: component-with-input-artifact-2
template: component-with-input-artifact-2
- arguments:
artifacts:
- name: text
raw:
data: Text from a file with hard-coded artifact value
name: component-with-input-artifact-3
template: component-with-input-artifact-3
name: pipeline-with-artifact-input-raw-argument-value

View File

@ -270,6 +270,23 @@ implementation:
self.assertEqual(task1.arguments[0], '--output-data')
self.assertTrue(task1.arguments[1].startswith('/'))
def test_input_path_placeholder_with_constant_argument(self):
component_text = '''\
inputs:
- {name: input 1}
implementation:
container:
image: busybox
command:
- --input-data
- {inputPath: input 1}
'''
task_factory1 = comp.load_component_from_text(component_text)
task1 = task_factory1('Text')
self.assertEqual(task1.command, ['--input-data', task1.input_artifact_paths['input 1']])
self.assertEqual(task1.artifact_arguments, {'input 1': 'Text'})
def test_optional_inputs_reordering(self):
'''Tests optional input reordering.
In python signature, optional arguments must come after the required arguments.