feat(sdk): Implements artifact URI placeholder. (#4932)

* add placeholder to spec

* add output_directory to pipeline

* respect uri placeholder in file outputs

* wip: add data passing rewriting logic to respect the uri semantics

* merge input_uri and paths when instantiating ContainerOp

* fix

* fix workflow rewriting

* Add topology rewriting

* add a test case, and various fixes

* make the test case more complex

* Fix the case when working with OpsGroup

* Fix test case

* fix resolving test

* fix redundant cmd lines

* fix redundant cmd lines

* resolve comments

* fix file outputs

* resolve comments

* copy file outputs instead of modifying inplace.
This commit is contained in:
Jiaxiao Zheng 2021-01-05 20:39:51 -08:00 committed by GitHub
parent c8c9905bf2
commit 7540ba5c3b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 790 additions and 45 deletions

View File

@ -1,7 +1,11 @@
import copy
import json
import os
import re
from typing import List, Optional, Set
from typing import Any, Dict, List, Optional, Set, Tuple
from kfp.components import _components
def fix_big_data_passing(workflow: dict) -> dict:
'''fix_big_data_passing converts a workflow where some artifact data is passed as parameters and converts it to a workflow where this data is passed as artifacts.
@ -25,9 +29,7 @@ def fix_big_data_passing(workflow: dict) -> dict:
3. Propagate the consumption information upstream to all inputs/outputs all the way up to the data producers.
4. Convert the inputs, outputs and arguments based on how they're consumed downstream.
'''
workflow = copy.deepcopy(workflow)
templates = workflow['spec']['templates']
container_templates = [template for template in workflow['spec']['templates'] if 'container' in template]
dag_templates = [template for template in workflow['spec']['templates'] if 'dag' in template]
@ -381,3 +383,337 @@ def deconstruct_single_placeholder(s: str) -> List[str]:
if not re.fullmatch('{{[-._a-zA-Z0-9]+}}', s):
return None
return s.lstrip('{').rstrip('}').split('.')
def _replace_output_dir_placeholder(command_line: str,
output_directory: Optional[str] = None) -> str:
"""Replaces the output directory placeholder."""
if _components.OUTPUT_DIR_PLACEHOLDER in command_line:
if not output_directory:
raise ValueError('output_directory of a pipeline must be specified '
'when URI placeholder is used.')
return command_line.replace(
_components.OUTPUT_DIR_PLACEHOLDER, output_directory)
return command_line
def _refactor_outputs_if_uri_placeholder(
container_template: Dict[str, Any],
output_to_filename: Dict[str, str]
) -> None:
"""Rewrites the output of the container in case of URI placeholder.
Also, collects the mapping from the output names to output file name.
Args:
container_template: The container template structure.
output_to_filename: The mapping from the artifact name to the actual file
name of the content. This will be used later when reconciling the
URIs on the consumer side.
"""
# If there's no artifact outputs then no refactor is needed.
if not container_template.get('outputs') or not container_template[
'outputs'].get('artifacts'):
return
parameter_outputs = container_template['outputs'].get('parameters') or []
new_artifact_outputs = []
for artifact_output in container_template['outputs']['artifacts']:
# Check if this is an output associated with URI placeholder based
# on its path.
if _components.OUTPUT_DIR_PLACEHOLDER in artifact_output['path']:
# If so, we'll add a parameter output to output the pod name
parameter_outputs.append(
{
'name': _components.PRODUCER_POD_NAME_PARAMETER.format(
artifact_output['name']),
'value': '{{pod.name}}'
})
output_to_filename[artifact_output['name']] = os.path.basename(
artifact_output['path'])
else:
# Otherwise, this artifact output is preserved.
new_artifact_outputs.append(artifact_output)
container_template['outputs']['artifacts'] = new_artifact_outputs
container_template['outputs']['parameters'] = parameter_outputs
def _refactor_inputs_if_uri_placeholder(
container_template: Dict[str, Any],
output_to_filename: Dict[str, str],
refactored_inputs: Dict[Tuple[str, str], str]
) -> None:
"""Rewrites the input of the container in case of URI placeholder.
Rewrites the artifact input of the container when it's used as a URI
placeholder. Also, collects the inputs being rewritten into a list, so that
it can be wired to correct task outputs later. Meanwhile, the filename used
by input URI placeholder will be reconciled with its corresponding producer.
Args:
container_template: The container template structure.
output_to_filename: The mapping from output name to the file name.
refactored_inputs: The mapping used to collect the input artifact being
refactored from (template name, previous name) to its new name.
"""
# If there's no artifact inputs then no refactor is needed.
if not container_template.get('inputs') or not container_template[
'inputs'].get('artifacts'):
return
parameter_inputs = container_template['inputs'].get('parameters') or []
new_artifact_inputs = []
for artifact_input in container_template['inputs']['artifacts']:
# Check if this is an input artifact associated with URI placeholder,
# according to its path.
if _components.OUTPUT_DIR_PLACEHOLDER in artifact_input['path']:
# If so, we'll add a parameter input to receive the producer's pod
# name.
# The correct input parameter name should be parsed from the
# path field, which is given according to the component I/O
# definition.
m = re.match(
r'.*/{{workflow\.uid}}/{{inputs\.parameters\.(?P<input_name>.*)'
r'}}/.*',
artifact_input['path'])
input_name = m.group('input_name')
parameter_inputs.append({'name': input_name})
# Here we're using the template name + previous artifact input name
# as key, because it will be refactored later at the DAG level.
refactored_inputs[(container_template['name'],
artifact_input['name'])] = input_name
# In the container implementation, the pod name is already connected
# to the input parameter per the implementation in _components.
# The only thing yet to be reconciled is the file name.
def reconcile_filename(
command_lines: List[str]) -> List[str]:
new_command_lines = []
for cmd in command_lines:
matched = re.match(
r'.*/{{workflow\.uid}}/{{inputs\.parameters\.'
+ input_name + r'}}/(?P<filename>.*)', cmd)
if matched:
new_command_lines.append(
cmd[:-len(matched.group('filename'))] +
output_to_filename[artifact_input['name']])
else:
new_command_lines.append(cmd)
return new_command_lines
if container_template['container'].get('args'):
container_template['container']['args'] = reconcile_filename(
container_template['container']['args'])
if container_template['container'].get('command'):
container_template['container']['command'] = reconcile_filename(
container_template['container']['command'])
else:
new_artifact_inputs.append(artifact_input)
container_template['inputs']['artifacts'] = new_artifact_inputs
container_template['inputs']['parameters'] = parameter_inputs
def _refactor_dag_inputs(
dag_template: Dict[str, Any],
refactored_inputs: Dict[Tuple[str, str], str]
) -> None:
"""Refactors the inputs of the DAG template.
Args:
dag_template: The DAG template structure.
refactored_inputs: The mapping of template and input names to be
refactored, to its new name.
"""
# One hacky way to do the refactoring is by looking at the name of the
# artifact argument. If the name appears in the refactored_inputs mapping,
# this should be changed to a parameter input regardless of the template
# name.
# The correctness of this approach is ensured by the data passing rewriting
# process that changed the artifact inputs' name to be
# '{{output_template}}-{{output_name}}', which is consistent across all
# templates.
if not dag_template.get('inputs', {}).get('artifacts'):
return
artifact_to_new_name = {k[1] : v for k, v in refactored_inputs.items()}
parameter_inputs = dag_template['inputs'].get('parameters') or []
new_artifact_inputs = []
for input_artifact in dag_template['inputs']['artifacts']:
if input_artifact['name'] in artifact_to_new_name:
parameter_inputs.append(
{'name': artifact_to_new_name[input_artifact['name']]})
refactored_inputs[(dag_template['name'],
input_artifact['name'])] = artifact_to_new_name[
input_artifact['name']]
else:
new_artifact_inputs.append(input_artifact)
dag_template['inputs']['artifacts'] = new_artifact_inputs
dag_template['inputs']['parameters'] = parameter_inputs
def _refactor_dag_template_uri_inputs(
dag_template: Dict[str, Any],
refactored_inputs: Dict[Tuple[str, str], str]
) -> None:
"""Refactors artifact inputs within the DAG template.
An artifact input will be changed to a parameter input if it needs to be
connected to the producer's pod name output. This is determined by whether
the input is present in refactored_inputs list, which is generated by the
container template refactoring process `_refactor_inputs_if_uri_placeholder`
Args:
dag_template: The DAG template structure.
refactored_inputs: The mapping of template and input names to be
refactored, to its new name.
"""
# Traverse the tasks in the DAG, and inspect the task arguments.
for task in dag_template['dag'].get('tasks', []):
if not task.get('arguments') or not task['arguments'].get('artifacts'):
continue
artifact_args = task['arguments']['artifacts']
new_artifact_args = []
template_name = task['name']
parameter_args = task.get('arguments', {}).get('parameters', [])
for artifact_arg in artifact_args:
assert 'name' in artifact_arg, (
'Illegal artifact format: %s' % artifact_arg)
if (template_name, artifact_arg['name']) in refactored_inputs:
# If this is an input artifact that has been refactored.
# It will be changed to an input parameter receiving the
# producer's pod name.
pod_parameter_name = refactored_inputs[
(template_name, artifact_arg['name'])]
# There are two cases for a DAG template.
assert (artifact_arg.get('from', '').startswith(
'{{inputs.') or artifact_arg.get('from', '').startswith(
'{{tasks.')), (
"Illegal 'from' found for argument %s" % artifact_arg)
arg_from = artifact_arg['from']
if arg_from.startswith('{{tasks.'):
# 1. The argument to refactor is from another task in the same
# DAG.
task_matches = re.match(
r'{{tasks\.(?P<task_name>.*)\.outputs\.artifacts'
r'\.(?P<output_name>.*)}}',
arg_from)
task_name, output_name = task_matches.group(
'task_name'), task_matches.group('output_name')
parameter_args.append({
'name': pod_parameter_name,
'value': '{{{{tasks.{task_name}.outputs.'
'parameters.{output}}}}}'.format(
task_name=task_name,
output=_components.PRODUCER_POD_NAME_PARAMETER.format(
output_name)
)})
else:
# 2. The assert above ensures that the argument to refactor
# is from an input of the current DAG template.
# Then, we'll reconnect it to the DAG input, which has been
# renamed.
input_matches = re.match(
r'{{inputs\.artifacts\.(?P<input_name>.*)}}',
arg_from)
assert input_matches, (
'The matched input is expected to be artifact, '
'get parameter instead: %s' % arg_from)
# Get the corresponding refactored name of this DAG template
new_input = refactored_inputs[(
dag_template['name'],
input_matches.group('input_name'))]
parameter_args.append({
'name': pod_parameter_name,
'value': '{{{{inputs.parameters.{new_input}}}}}'.format(
new_input=new_input,
)
})
else:
# Otherwise this artifact input will be preserved
new_artifact_args.append(artifact_arg)
task['arguments']['artifacts'] = new_artifact_args
task['arguments']['parameters'] = parameter_args
def add_pod_name_passing(
workflow: Dict[str, Any],
output_directory: Optional[str] = None) -> Dict[str, Any]:
"""Refactors the workflow structure to pass pod names when needded.
Args:
workflow: The workflow structure.
output_directory: The specified output path.
Returns:
Modified workflow structure.
Raises:
ValueError: when uri placeholder is used in the workflow but no output
directory was provided.
"""
workflow = copy.deepcopy(workflow)
# Sets of templates representing a container task.
container_templates = [template for template in
workflow['spec']['templates'] if
'container' in template]
# Sets of templates representing a (sub)DAG.
dag_templates = [template for template in
workflow['spec']['templates'] if
'dag' in template]
# 1. If there's an output using outputUri placeholder, then this container
# template needs to declare an output to pass its pod name to the downstream
# consumer. The name of the added output will be
# {{output-name}}-producer-pod-id.
# Also, eliminate the existing file artifact.
output_to_filename = {}
for idx, template in enumerate(container_templates):
_refactor_outputs_if_uri_placeholder(template, output_to_filename)
# 2. If there's an input using inputUri placeholder, then this container
# template needs to declare an input to receive the pod name of the producer
# task.
refactored_inputs = {}
for template in container_templates:
_refactor_inputs_if_uri_placeholder(
template, output_to_filename, refactored_inputs)
# For DAG templates, we need to figure out all the inputs that are
# eventually being refactored down to the container template level.
for template in dag_templates:
_refactor_dag_inputs(template, refactored_inputs)
# 3. In the DAG templates, wire the pod name inputs/outputs together.
for template in dag_templates:
_refactor_dag_template_uri_inputs(template, refactored_inputs)
# 4. For all the container command/args, replace {{kfp.pipeline_root}}
# placeholders with the actual output directory specified.
# Also, the file names need to be reconciled in the consumer to keep
# consistent with producer.
for template in container_templates:
# Process {{kfp.pipeline_root}} placeholders.
args = template['container'].get('args') or []
if args:
new_args = [_replace_output_dir_placeholder(arg, output_directory)
for arg in args]
template['container']['args'] = new_args
cmds = template['container'].get('command') or []
if cmds:
new_cmds = [_replace_output_dir_placeholder(cmd, output_directory)
for cmd in cmds]
template['container']['command'] = new_cmds
clean_up_empty_workflow_structures(workflow)
return workflow

View File

@ -24,6 +24,7 @@ from typing import Callable, Set, List, Text, Dict, Tuple, Any, Union, Optional
import kfp
from kfp.dsl import _for_loop
from kfp.compiler import _data_passing_rewriter
from .. import dsl
from ._k8s_helper import convert_k8s_obj_to_json, sanitize_k8s_name
@ -872,6 +873,10 @@ class Compiler(object):
from ._data_passing_rewriter import fix_big_data_passing
workflow = fix_big_data_passing(workflow)
output_directory = getattr(pipeline_func, 'output_directory', None)
workflow = _data_passing_rewriter.add_pod_name_passing(workflow,
output_directory)
if pipeline_conf and pipeline_conf.data_passing_method != None:
workflow = pipeline_conf.data_passing_method(workflow)

View File

@ -20,7 +20,7 @@ __all__ = [
]
import copy
import sys
import os
from collections import OrderedDict
from typing import Any, Callable, List, Mapping, NamedTuple, Sequence, Union
from ._naming import _sanitize_file_name, _sanitize_python_function_name, generate_unique_name_conversion_table
@ -186,6 +186,50 @@ def _generate_output_file_name(port_name):
return _outputs_dir + '/' + _sanitize_file_name(port_name) + '/' + _single_io_file_name
# Placeholder to represent the output directory hosting all the generated URIs.
# Its actual value will be specified during pipeline compilation.
OUTPUT_DIR_PLACEHOLDER = '{{kfp.output_dir}}'
# Format of the Argo parameter used to pass the producer's Pod ID to
# the consumer.
PRODUCER_POD_NAME_PARAMETER = '{}-producer-pod-id-'
def _generate_output_uri(port_name: str) -> str:
"""Generates a unique URI for an output.
Args:
port_name: The name of the output associated with this URI.
Returns:
The URI assigned to this output, which is unique within the pipeline.
"""
return os.path.join(
OUTPUT_DIR_PLACEHOLDER,
'{{workflow.uid}}',
'{{pod.name}}',
port_name
)
def _generate_input_uri(port_name: str) -> str:
"""Generates the URI for an input.
Args:
port_name: The name of the input associated with this URI.
Returns:
The URI assigned to this input, will be consistent with the URI where
the actual content is written after compilation.
"""
return os.path.join(
OUTPUT_DIR_PLACEHOLDER,
'{{workflow.uid}}',
'{{{{inputs.parameters.{input}}}}}'.format(
input=PRODUCER_POD_NAME_PARAMETER.format(port_name)),
port_name
)
def _react_to_incompatible_reference_type(
input_type,
argument_type,
@ -387,18 +431,14 @@ _ResolvedCommandLineAndPaths = NamedTuple(
)
def _not_implemented(name: str) -> str:
raise NotImplementedError
def _resolve_command_line_and_paths(
component_spec: ComponentSpec,
arguments: Mapping[str, str],
input_path_generator: Callable[[str], str] = _generate_input_file_name,
output_path_generator: Callable[[str], str] = _generate_output_file_name,
argument_serializer: Callable[[str], str] = serialize_value,
input_uri_generator: Callable[[str], str] = _not_implemented,
output_uri_generator: Callable[[str], str] = _not_implemented,
input_uri_generator: Callable[[str], str] = _generate_input_uri,
output_uri_generator: Callable[[str], str] = _generate_output_uri,
) -> _ResolvedCommandLineAndPaths:
"""Resolves the command line argument placeholders. Also produces the maps of the generated inpuit/output paths."""
argument_values = arguments
@ -469,7 +509,6 @@ def _resolve_command_line_and_paths(
elif isinstance(arg, InputUriPlaceholder):
input_name = arg.input_name
input_argument = argument_values.get(input_name, None)
if input_name in argument_values:
input_uri = input_uri_generator(input_name)
input_uris[input_name] = input_uri

View File

@ -741,9 +741,7 @@ implementation:
task = op(in1='foo')
resolved_cmd = _resolve_command_line_and_paths(
component_spec=task.component_ref.spec,
arguments=task.arguments,
input_uri_generator=lambda name: f"{{{{inputs[{name}].uri}}}}",
output_uri_generator=lambda name: f"{{{{outputs[{name}].uri}}}}",
arguments=task.arguments
)
self.assertEqual(
@ -751,9 +749,9 @@ implementation:
[
'program',
'--in1-uri',
'{{inputs[In1].uri}}',
'{{kfp.output_dir}}/{{workflow.uid}}/{{inputs.parameters.In1-producer-pod-id-}}/In1',
'--out1-uri',
'{{outputs[Out1].uri}}',
'{{kfp.output_dir}}/{{workflow.uid}}/{{pod.name}}/Out1',
]
)

View File

@ -0,0 +1,56 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""The entrypoint binary used in KFP component."""
import argparse
import sys
class ParseKwargs(argparse.Action):
"""Helper class to parse the keyword arguments.
This Python binary expects a set of kwargs, whose keys are not predefined.
"""
def __call__(
self, parser, namespace, values, option_string=None):
setattr(namespace, self.dest, dict())
assert len(values) % 2 == 0, 'Each specified arg key must have a value.'
current_key = None
for idx, value in enumerate(values):
if idx % 2 == 0:
# Parse this into a key.
current_key = value
else:
# Parse current value with the previous key.
getattr(namespace, self.dest)[current_key] = value
def main():
"""The main program of KFP container entrypoint.
This entrypoint should be called as follows:
python run_container.py -k key1 value1 key2 value2 ...
The recognized argument keys are as follows:
- {input-parameter-name}_metadata_file
- {input_parameter-name}_field_name
- {}
"""
parser = argparse.ArgumentParser()
parser.add_argument('-k', '--kwargs', nargs='*', action=ParseKwargs)
args = parser.parse_args(sys.argv[1:]) # Skip the file name.
print(args.kwargs)
if __name__ == '__main__':
main()

View File

@ -12,6 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import copy
from typing import Any, Mapping
from ..components.structures import ComponentSpec, ComponentReference
@ -46,20 +47,36 @@ def _create_container_op_from_component_and_arguments(
old_warn_value = dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = True
# Merge output_paths and output_uris to get the file_outputs.
file_outputs = collections.OrderedDict(resolved_cmd.output_paths or {})
for name, output_uri in resolved_cmd.output_uris.items():
file_outputs[name] = output_uri
artifact_argument_paths = [
dsl.InputArgumentPath(
argument=arguments[input_name],
input=input_name,
path=path,
)
for input_name, path in resolved_cmd.input_paths.items()
]
for input_name, input_uri in resolved_cmd.input_uris.items():
artifact_argument_paths.append(
dsl.InputArgumentPath(
argument=arguments[input_name],
input=input_name,
path=input_uri
))
task = dsl.ContainerOp(
name=component_spec.name or _default_component_name,
image=container_spec.image,
command=resolved_cmd.command,
arguments=resolved_cmd.args,
file_outputs=resolved_cmd.output_paths,
artifact_argument_paths=[
dsl.InputArgumentPath(
argument=arguments[input_name],
input=input_name,
path=path,
)
for input_name, path in resolved_cmd.input_paths.items()
],
file_outputs=file_outputs,
artifact_argument_paths=artifact_argument_paths,
)
dsl.ContainerOp._DISABLE_REUSABLE_COMPONENT_WARNING = old_warn_value

View File

@ -1011,19 +1011,19 @@ class ContainerOp(BaseOp):
_DISABLE_REUSABLE_COMPONENT_WARNING = False
def __init__(
self,
name: str,
image: str,
command: StringOrStringList = None,
arguments: StringOrStringList = None,
init_containers: List[UserContainer] = None,
sidecars: List[Sidecar] = None,
container_kwargs: Dict = None,
artifact_argument_paths: List[InputArgumentPath] = None,
file_outputs: Dict[str, str] = None,
output_artifact_paths: Dict[str, str]=None,
is_exit_handler=False,
pvolumes: Dict[str, V1Volume] = None,
self,
name: str,
image: str,
command: Optional[StringOrStringList] = None,
arguments: Optional[StringOrStringList] = None,
init_containers: Optional[List[UserContainer]] = None,
sidecars: Optional[List[Sidecar]] = None,
container_kwargs: Optional[Dict] = None,
artifact_argument_paths: Optional[List[InputArgumentPath]] = None,
file_outputs: Optional[Dict[str, str]] = None,
output_artifact_paths: Optional[Dict[str, str]] = None,
is_exit_handler: bool = False,
pvolumes: Optional[Dict[str, V1Volume]] = None,
):
super().__init__(name=name, init_containers=init_containers, sidecars=sidecars, is_exit_handler=is_exit_handler)

View File

@ -12,23 +12,27 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Union
from typing import Callable, Optional, Union
from kubernetes.client.models import V1PodDNSConfig
from . import _container_op
from . import _resource_op
from . import _ops_group
from ._component_bridge import _create_container_op_from_component_and_arguments, _sanitize_python_function_name
from ._component_bridge import \
_create_container_op_from_component_and_arguments, \
_sanitize_python_function_name
from ..components import _components
from ..components._naming import _make_name_unique_by_adding_index
import sys
# This handler is called whenever the @pipeline decorator is applied.
# It can be used by command-line DSL compiler to inject code that runs for every pipeline definition.
_pipeline_decorator_handler = None
def pipeline(name : str = None, description : str = None):
def pipeline(
name: Optional[str] = None,
description: Optional[str] = None,
output_directory: Optional[str] = None):
"""Decorator of pipeline functions.
Example
@ -37,15 +41,27 @@ def pipeline(name : str = None, description : str = None):
@pipeline(
name='my awesome pipeline',
description='Is it really awesome?'
output_directory='gs://my-bucket/my-output-path'
)
def my_pipeline(a: PipelineParam, b: PipelineParam):
...
Args:
name: The pipeline name. Default to a sanitized version of the function
name.
description: Optionally, a human-readable description of the pipeline.
output_directory: The root directory to generate input/output URI under this
pipeline. This is required if input/output URI placeholder is used in this
pipeline.
"""
def _pipeline(func):
def _pipeline(func: Callable):
if name:
func._component_human_name = name
if description:
func._component_description = description
if output_directory:
func.output_directory = output_directory
if _pipeline_decorator_handler:
return _pipeline_decorator_handler(func) or func
@ -54,6 +70,7 @@ def pipeline(name : str = None, description : str = None):
return _pipeline
class PipelineConf():
"""PipelineConf contains pipeline level settings."""
@ -131,7 +148,6 @@ class PipelineConf():
self.default_pod_node_selector[label_name] = value
return self
def set_image_pull_policy(self, policy: str):
"""Configures the default image pull policy
@ -196,13 +212,15 @@ class PipelineConf():
"""
self._data_passing_method = value
def get_pipeline_conf():
"""Configure the pipeline level setting to the current pipeline
Note: call the function inside the user defined pipeline function.
"""
return Pipeline.get_default_pipeline().conf
#TODO: Pipeline is in fact an opsgroup, refactor the code.
# TODO: Pipeline is in fact an opsgroup, refactor the code.
class Pipeline():
"""A pipeline contains a list of operators.

View File

@ -1088,3 +1088,6 @@ implementation:
parameter_arguments_json = template['metadata']['annotations']['pipelines.kubeflow.org/arguments.parameters']
parameter_arguments = json.loads(parameter_arguments_json)
self.assertEqual(set(parameter_arguments.keys()), {'Input 1'})
def test_uri_artifact_passing(self):
self._test_py_compile_yaml('uri_artifacts')

View File

@ -0,0 +1,105 @@
# Copyright 2020 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Pipeline DSL code for testing URI-based artifact passing."""
from kfp import compiler
from kfp import components
from kfp import dsl
# Patch to make the test result deterministic.
class Coder:
def __init__(self, ):
self._code_id = 0
def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(
code=self._code_id,
num_chars=dsl._for_loop.LoopArguments.NUM_CODE_CHARS)
dsl.ParallelFor._get_unique_id_code = Coder().get_code
write_to_gcs = components.load_component_from_text("""
name: Write to GCS
inputs:
- {name: text, type: String, description: 'Content to be written to GCS'}
outputs:
- {name: output_gcs_path, type: GCSPath, description: 'GCS file path'}
implementation:
container:
image: google/cloud-sdk:slim
command:
- sh
- -c
- |
set -e -x
echo "$0" | gsutil cp - "$1"
- {inputValue: text}
- {outputUri: output_gcs_path}
""")
read_from_gcs = components.load_component_from_text("""
name: Read from GCS
inputs:
- {name: input_gcs_path, type: GCSPath, description: 'GCS file path'}
implementation:
container:
image: google/cloud-sdk:slim
command:
- sh
- -c
- |
set -e -x
gsutil cat "$0"
- {inputUri: input_gcs_path}
""")
def flip_coin_op():
"""Flip a coin and output heads or tails randomly."""
return dsl.ContainerOp(
name='Flip coin',
image='python:alpine3.6',
command=['sh', '-c'],
arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
'else \'tails\'; print(result)" | tee /tmp/output'],
file_outputs={'output': '/tmp/output'}
)
@dsl.pipeline(
name='uri-artifact-pipeline',
output_directory='gs://my-bucket/my-output-dir')
def uri_artifact(text='Hello world!'):
task_1 = write_to_gcs(text=text)
task_2 = read_from_gcs(
input_gcs_path=task_1.outputs['output_gcs_path'])
# Test use URI within ParFor loop.
loop_args = [1, 2, 3, 4]
with dsl.ParallelFor(loop_args) as loop_arg:
loop_task_2 = read_from_gcs(
input_gcs_path=task_1.outputs['output_gcs_path'])
# Test use URI within condition.
flip = flip_coin_op()
with dsl.Condition(flip.output == 'heads'):
condition_task_2 = read_from_gcs(
input_gcs_path=task_1.outputs['output_gcs_path'])
if __name__ == '__main__':
compiler.Compiler().compile(uri_artifact, __file__ + '.tar.gz')

View File

@ -0,0 +1,168 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: uri-artifact-pipeline-
annotations: {pipelines.kubeflow.org/kfp_sdk_version: 1.2.0, pipelines.kubeflow.org/pipeline_compilation_time: '2020-12-23T10:49:24.672741',
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "Hello world!",
"name": "text", "optional": true}], "name": "uri-artifact-pipeline"}'}
labels: {pipelines.kubeflow.org/kfp_sdk_version: 1.2.0}
spec:
entrypoint: uri-artifact-pipeline
templates:
- name: condition-2
inputs:
parameters:
- {name: input_gcs_path-producer-pod-id-}
dag:
tasks:
- name: read-from-gcs-3
template: read-from-gcs-3
arguments:
parameters:
- {name: input_gcs_path-producer-pod-id-, value: '{{inputs.parameters.input_gcs_path-producer-pod-id-}}'}
- name: flip-coin
container:
args: ['python -c "import random; result = ''heads'' if random.randint(0,1)
== 0 else ''tails''; print(result)" | tee /tmp/output']
command: [sh, -c]
image: python:alpine3.6
outputs:
parameters:
- name: flip-coin-output
valueFrom: {path: /tmp/output}
artifacts:
- {name: flip-coin-output, path: /tmp/output}
- name: for-loop-for-loop-00000001-1
inputs:
parameters:
- {name: input_gcs_path-producer-pod-id-}
dag:
tasks:
- name: read-from-gcs-2
template: read-from-gcs-2
arguments:
parameters:
- {name: input_gcs_path-producer-pod-id-, value: '{{inputs.parameters.input_gcs_path-producer-pod-id-}}'}
- name: read-from-gcs
container:
args: []
command:
- sh
- -c
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "GCSPath"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
- name: read-from-gcs-2
container:
args: []
command:
- sh
- -c
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "GCSPath"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
- name: read-from-gcs-3
container:
args: []
command:
- sh
- -c
- |
set -e -x
gsutil cat "$0"
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{inputs.parameters.input_gcs_path-producer-pod-id-}}/output_gcs_path
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: input_gcs_path-producer-pod-id-}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\ngsutil cat \"$0\"\n", {"inputUri":
"input_gcs_path"}], "image": "google/cloud-sdk:slim"}}, "inputs": [{"description":
"GCS file path", "name": "input_gcs_path", "type": "GCSPath"}], "name":
"Read from GCS"}', pipelines.kubeflow.org/component_ref: '{"digest": "03872dc24d4e846635a06a596d840649f9f9d0744862c7762c0151d0cd65d60b"}'}
- name: uri-artifact-pipeline
inputs:
parameters:
- {name: text}
dag:
tasks:
- name: condition-2
template: condition-2
when: '"{{tasks.flip-coin.outputs.parameters.flip-coin-output}}" == "heads"'
dependencies: [flip-coin, write-to-gcs]
arguments:
parameters:
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
- {name: flip-coin, template: flip-coin}
- name: for-loop-for-loop-00000001-1
template: for-loop-for-loop-00000001-1
dependencies: [write-to-gcs]
arguments:
parameters:
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
withItems: [1, 2, 3, 4]
- name: read-from-gcs
template: read-from-gcs
dependencies: [write-to-gcs]
arguments:
parameters:
- {name: input_gcs_path-producer-pod-id-, value: '{{tasks.write-to-gcs.outputs.parameters.write-to-gcs-output_gcs_path-producer-pod-id-}}'}
- name: write-to-gcs
template: write-to-gcs
arguments:
parameters:
- {name: text, value: '{{inputs.parameters.text}}'}
- name: write-to-gcs
container:
args: []
command:
- sh
- -c
- |
set -e -x
echo "$0" | gsutil cp - "$1"
- '{{inputs.parameters.text}}'
- gs://my-bucket/my-output-dir/{{workflow.uid}}/{{pod.name}}/output_gcs_path
image: google/cloud-sdk:slim
inputs:
parameters:
- {name: text}
outputs:
parameters:
- {name: write-to-gcs-output_gcs_path-producer-pod-id-, value: '{{pod.name}}'}
metadata:
annotations: {pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
{"command": ["sh", "-c", "set -e -x\necho \"$0\" | gsutil cp - \"$1\"\n",
{"inputValue": "text"}, {"outputUri": "output_gcs_path"}], "image": "google/cloud-sdk:slim"}},
"inputs": [{"description": "Content to be written to GCS", "name": "text",
"type": "String"}], "name": "Write to GCS", "outputs": [{"description":
"GCS file path", "name": "output_gcs_path", "type": "GCSPath"}]}', pipelines.kubeflow.org/component_ref: '{"digest":
"8ec42d2ddcbbce0607719e277bbe8e40f3649a894e46c8e92309951733bc7766"}', pipelines.kubeflow.org/arguments.parameters: '{"text":
"{{inputs.parameters.text}}"}'}
arguments:
parameters:
- {name: text, value: Hello world!}
serviceAccountName: pipeline-runner