[SDK/compiler] Sanitize op name for PipelineParam (#2711)

* sanitize op name for pipeline param

* refactor sanitization to compiler level, and add unittest
This commit is contained in:
Jiaxiao Zheng 2019-12-27 18:01:39 -08:00 committed by Kubernetes Prow Robot
parent ad5b2eb283
commit 358e26adb1
4 changed files with 199 additions and 4 deletions

View File

@ -477,14 +477,19 @@ class Compiler(object):
if pipeline_param.op_name is None:
withparam_value = '{{workflow.parameters.%s}}' % pipeline_param.name
else:
param_name = '%s-%s' % (pipeline_param.op_name, pipeline_param.name)
withparam_value = '{{tasks.%s.outputs.parameters.%s}}' % (pipeline_param.op_name, param_name)
param_name = '%s-%s' % (
sanitize_k8s_name(pipeline_param.op_name), pipeline_param.name)
withparam_value = '{{tasks.%s.outputs.parameters.%s}}' % (
sanitize_k8s_name(pipeline_param.op_name),
param_name)
# these loop args are the output of another task
if 'dependencies' not in task or task['dependencies'] is None:
task['dependencies'] = []
if pipeline_param.op_name not in task['dependencies']:
task['dependencies'].append(pipeline_param.op_name)
if sanitize_k8s_name(
pipeline_param.op_name) not in task['dependencies']:
task['dependencies'].append(
sanitize_k8s_name(pipeline_param.op_name))
task['withParam'] = withparam_value
else:

View File

@ -780,6 +780,9 @@ implementation:
def test_withparam_output_dict(self):
self._test_py_compile_yaml('withparam_output_dict')
def test_withparam_lightweight_out(self):
self._test_py_compile_yaml('loop_over_lightweight_output')
def test_py_input_artifact_raw_value(self):
"""Test pipeline input_artifact_raw_value."""
self._test_py_compile_yaml('input_artifact_raw_value')

View File

@ -0,0 +1,64 @@
#!/usr/bin/env python3
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kfp
from kfp import dsl
from kfp.dsl import _for_loop
class Coder:
def __init__(self, ):
self._code_id = 0
def get_code(self, ):
self._code_id += 1
return '{code:0{num_chars:}d}'.format(code=self._code_id, num_chars=_for_loop.LoopArguments.NUM_CODE_CHARS)
dsl.ParallelFor._get_unique_id_code = Coder().get_code
produce_op = kfp.components.load_component_from_text('''\
name: Produce list
outputs:
- name: data_list
implementation:
container:
image: busybox
command:
- sh
- -c
- echo "[1, 2, 3]" > "$0"
- outputPath: data_list
''')
consume_op = kfp.components.load_component_from_text('''\
name: Consume data
inputs:
- name: data
implementation:
container:
image: busybox
command:
- echo
- inputValue: data
''')
@dsl.pipeline(
name='Loop over lightweight output',
description='Test pipeline to verify functions of par loop.'
)
def pipeline():
source_task = produce_op()
with dsl.ParallelFor(source_task.output) as item:
consume_op(item)

View File

@ -0,0 +1,123 @@
# Copyright 2019 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"apiVersion": |-
argoproj.io/v1alpha1
"kind": |-
Workflow
"metadata":
"annotations":
"pipelines.kubeflow.org/pipeline_spec": |-
{"description": "Test pipeline to verify functions of par loop.", "name": "Loop over lightweight output"}
"generateName": |-
loop-over-lightweight-output-
"spec":
"arguments":
"parameters": []
"entrypoint": |-
loop-over-lightweight-output
"serviceAccountName": |-
pipeline-runner
"templates":
- "container":
"args": []
"command":
- |-
echo
- |-
{{inputs.parameters.produce-list-data_list}}
"image": |-
busybox
"inputs":
"parameters":
- "name": |-
produce-list-data_list
"metadata":
"annotations":
"pipelines.kubeflow.org/component_spec": |-
{"inputs": [{"name": "data"}], "name": "Consume data"}
"name": |-
consume-data
- "dag":
"tasks":
- "arguments":
"parameters":
- "name": |-
produce-list-data_list
"value": |-
{{inputs.parameters.produce-list-data_list}}
"name": |-
consume-data
"template": |-
consume-data
"inputs":
"parameters":
- "name": |-
produce-list-data_list
"name": |-
for-loop-for-loop-00000001-1
- "dag":
"tasks":
- "arguments":
"parameters":
- "name": |-
produce-list-data_list
"value": |-
{{item}}
"dependencies":
- |-
produce-list
"name": |-
for-loop-for-loop-00000001-1
"template": |-
for-loop-for-loop-00000001-1
"withParam": |-
{{tasks.produce-list.outputs.parameters.produce-list-data_list}}
- "name": |-
produce-list
"template": |-
produce-list
"name": |-
loop-over-lightweight-output
- "container":
"args": []
"command":
- |-
sh
- |-
-c
- |-
echo "[1, 2, 3]" > "$0"
- |-
/tmp/outputs/data_list/data
"image": |-
busybox
"metadata":
"annotations":
"pipelines.kubeflow.org/component_spec": |-
{"name": "Produce list", "outputs": [{"name": "data_list"}]}
"name": |-
produce-list
"outputs":
"artifacts":
- "name": |-
produce-list-data_list
"path": |-
/tmp/outputs/data_list/data
"parameters":
- "name": |-
produce-list-data_list
"valueFrom":
"path": |-
/tmp/outputs/data_list/data