Support data passing pipeline. (#750)
This commit is contained in:
parent
01ed23392b
commit
cb7abf1535
|
|
@ -0,0 +1,167 @@
|
|||
# Copyright 2020 kubeflow.org
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This pipeline demonstrates and verifies all ways of data passing supported by KFP.
|
||||
|
||||
# %% [markdown]
|
||||
# KFP has simple data passing model.
|
||||
# There are three different kinds of arguments and tw different ways to consume an input argument.
|
||||
# All combinations are supported.
|
||||
|
||||
# Any input can be consumed:
|
||||
# * As value (`inputValue` placeholder)
|
||||
# * As file (`inputPath` placeholder).
|
||||
|
||||
# Input argument can come from:
|
||||
# * Constant value
|
||||
# * Pipeline parameter
|
||||
# * Upstream component output
|
||||
|
||||
# Combining these options there are 6 end-to-end data passing cases, each of which works regardless of type:
|
||||
|
||||
# 1. Pass constant value which gets consumed as value. (Common)
|
||||
# 2. Pass constant value which gets consumed as file. (Often used in test pipelines)
|
||||
# 3. Pass pipeline parameter that gets consumed as value. (Common)
|
||||
# 4. Pass pipeline parameter that gets consumed as file. (Rare. Sometimes used with JSON parameters)
|
||||
# 5. Pass task output that gets consumed as value. (Common)
|
||||
# 6. Pass task output that gets consumed as file. (Common)
|
||||
|
||||
# The only restriction on types is that when both upstream output and downstream input have types, the types must match.
|
||||
|
||||
# %%
|
||||
import kfp
|
||||
from kfp.components import create_component_from_func, InputPath, OutputPath
|
||||
|
||||
|
||||
# Components
|
||||
# Produce
|
||||
@create_component_from_func
|
||||
def produce_anything(data_path: OutputPath()):
|
||||
with open(data_path, "w") as f:
|
||||
f.write("produce_anything")
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_something(data_path: OutputPath("Something")):
|
||||
with open(data_path, "w") as f:
|
||||
f.write("produce_something")
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_something2() -> 'Something':
|
||||
return "produce_something2"
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_string() -> str:
|
||||
return "produce_string"
|
||||
|
||||
|
||||
# Consume as value
|
||||
@create_component_from_func
|
||||
def consume_anything_as_value(data):
|
||||
print("consume_anything_as_value: " + data)
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_something_as_value(data: "Something"):
|
||||
print("consume_something_as_value: " + data)
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_string_as_value(data: str):
|
||||
print("consume_string_as_value: " + data)
|
||||
|
||||
|
||||
# Consume as file
|
||||
@create_component_from_func
|
||||
def consume_anything_as_file(data_path: InputPath()):
|
||||
with open(data_path) as f:
|
||||
print("consume_anything_as_file: " + f.read())
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_something_as_file(data_path: InputPath('Something')):
|
||||
with open(data_path) as f:
|
||||
print("consume_something_as_file: " + f.read())
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_string_as_file(data_path: InputPath(str)):
|
||||
with open(data_path) as f:
|
||||
print("consume_string_as_file: " + f.read())
|
||||
|
||||
|
||||
# Pipeline
|
||||
@kfp.dsl.pipeline(name='data_passing_pipeline')
|
||||
def data_passing_pipeline(
|
||||
anything_param="anything_param",
|
||||
something_param: "Something" = "something_param",
|
||||
string_param: str = "string_param",
|
||||
):
|
||||
produced_anything = produce_anything().output
|
||||
produced_something = produce_something().output
|
||||
produced_string = produce_string().output
|
||||
|
||||
# Pass constant value; consume as value
|
||||
consume_anything_as_value("constant")
|
||||
consume_something_as_value("constant")
|
||||
consume_string_as_value("constant")
|
||||
|
||||
# Pass constant value; consume as file
|
||||
consume_anything_as_file("constant")
|
||||
consume_something_as_file("constant")
|
||||
consume_string_as_file("constant")
|
||||
|
||||
# Pass pipeline parameter; consume as value
|
||||
consume_anything_as_value(anything_param)
|
||||
consume_anything_as_value(something_param)
|
||||
consume_anything_as_value(string_param)
|
||||
consume_something_as_value(anything_param)
|
||||
consume_something_as_value(something_param)
|
||||
consume_string_as_value(anything_param)
|
||||
consume_string_as_value(string_param)
|
||||
|
||||
# Pass pipeline parameter; consume as file
|
||||
consume_anything_as_file(anything_param)
|
||||
consume_anything_as_file(something_param)
|
||||
consume_anything_as_file(string_param)
|
||||
consume_something_as_file(anything_param)
|
||||
consume_something_as_file(something_param)
|
||||
consume_string_as_file(anything_param)
|
||||
consume_string_as_file(string_param)
|
||||
|
||||
# Pass task output; consume as value
|
||||
consume_anything_as_value(produced_anything)
|
||||
consume_anything_as_value(produced_something)
|
||||
consume_anything_as_value(produced_string)
|
||||
consume_something_as_value(produced_anything)
|
||||
consume_something_as_value(produced_something)
|
||||
consume_string_as_value(produced_anything)
|
||||
consume_string_as_value(produced_string)
|
||||
|
||||
# Pass task output; consume as file
|
||||
consume_anything_as_file(produced_anything)
|
||||
consume_anything_as_file(produced_something)
|
||||
consume_anything_as_file(produced_string)
|
||||
consume_something_as_file(produced_anything)
|
||||
consume_something_as_file(produced_something)
|
||||
consume_string_as_file(produced_anything)
|
||||
consume_string_as_file(produced_string)
|
||||
|
||||
|
||||
from kfp_tekton.compiler import TektonCompiler
|
||||
|
||||
TektonCompiler().compile(data_passing_pipeline, __file__.replace('.py', '.yaml'))
|
||||
|
||||
|
|
@ -92,7 +92,7 @@ def fix_big_data_passing(workflow: dict) -> dict:
|
|||
|
||||
pipeline_template = workflow["spec"]["pipelineSpec"]
|
||||
|
||||
pipelinerun_template = workflow
|
||||
pipelinerun_template = load_annotations(workflow)
|
||||
|
||||
# 1. Index the pipelines to understand how data is being passed and which
|
||||
# inputs/outputs are connected to each other.
|
||||
|
|
@ -367,10 +367,10 @@ def fix_big_data_passing(workflow: dict) -> dict:
|
|||
argument_placeholder_parts[2],
|
||||
sanitize_k8s_name(argument_placeholder_parts[3]))
|
||||
|
||||
workflow = jsonify_annotations(workflow)
|
||||
# Need to confirm:
|
||||
# I didn't find the use cases to support workflow parameter consumed as artifacts downstream in tekton.
|
||||
# Whether this case need to be supporting?
|
||||
|
||||
clean_up_empty_workflow_structures(workflow)
|
||||
return workflow
|
||||
|
||||
|
|
@ -490,19 +490,17 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
|
|||
task_name, task_name, task_output.get('name'))
|
||||
task['taskSpec'] = replace_big_data_placeholder(
|
||||
task.get("taskSpec", {}), placeholder, workspaces_parameter)
|
||||
pipelinerun_template['metadata']['annotations'] = replace_big_data_placeholder(
|
||||
pipelinerun_template['metadata']['annotations'], placeholder, workspaces_parameter)
|
||||
artifact_items = pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items']
|
||||
artifact_items[task['name']] = replace_big_data_placeholder(
|
||||
artifact_items[task['name']], placeholder, workspaces_parameter)
|
||||
pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items'] = \
|
||||
artifact_items
|
||||
|
||||
# Remove artifacts outputs from results
|
||||
task.get("taskSpec", {})['results'] = [
|
||||
result for result in task_outputs
|
||||
if (task_name, result.get('name')) not in outputs_tasks
|
||||
]
|
||||
|
||||
# Data passing for task inputs
|
||||
task_spec = task.get('taskSpec', {})
|
||||
task_params = task_spec.get('params', [])
|
||||
task_artifacts = task_spec.get('artifacts', [])
|
||||
|
||||
# Data passing for task inputs
|
||||
for task_param in task_params:
|
||||
if (task_name, task_param.get('name')) in inputs_tasks:
|
||||
if not task_spec.setdefault('workspaces', []):
|
||||
|
|
@ -535,6 +533,34 @@ def big_data_passing_tasks(prname: str, task: dict, pipelinerun_template: dict,
|
|||
task_artifact['raw']['data'] = param_value
|
||||
task = input_artifacts_tasks_pr_params(task, task_artifact)
|
||||
|
||||
# If a task produces a result and artifact, add a step to copy artifact to results.
|
||||
artifact_items = pipelinerun_template['metadata']['annotations']['tekton.dev/artifact_items']
|
||||
add_copy_results_artifacts_step = False
|
||||
if task.get("taskSpec", {}):
|
||||
if task_spec.get('results', []):
|
||||
copy_results_artifact_step = _get_base_step('copy-results-artifacts')
|
||||
copy_results_artifact_step['onError'] = 'continue' # supported by v0.27+ of tekton.
|
||||
copy_results_artifact_step['script'] += 'TOTAL_SIZE=0\n'
|
||||
for result in task_spec['results']:
|
||||
if task['name'] in artifact_items:
|
||||
artifact_i = artifact_items[task['name']]
|
||||
for index, artifact_tuple in enumerate(artifact_i):
|
||||
artifact_name, artifact = artifact_tuple
|
||||
src = artifact
|
||||
dst = '$(results.%s.path)' % sanitize_k8s_name(result['name'])
|
||||
if artifact_name == result['name'] and src != dst:
|
||||
add_copy_results_artifacts_step = True
|
||||
copy_results_artifact_step['script'] += (
|
||||
'ARTIFACT_SIZE=`wc -c %s | awk \'{print $1}\'`\n' % src +
|
||||
'TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)\n' +
|
||||
'touch ' + dst + '\n' + # create an empty file by default.
|
||||
'if [[ $TOTAL_SIZE -lt 3072 ]]; then\n' +
|
||||
' cp ' + src + ' ' + dst + '\n' +
|
||||
'fi\n'
|
||||
)
|
||||
if add_copy_results_artifacts_step:
|
||||
task['taskSpec']['steps'].append(copy_results_artifact_step)
|
||||
|
||||
# Remove artifacts parameter from params
|
||||
task.get("taskSpec", {})['params'] = [
|
||||
param for param in task_spec.get('params', [])
|
||||
|
|
@ -610,3 +636,17 @@ def clean_up_empty_workflow_structures(workflow: list):
|
|||
del task['artifacts']
|
||||
if not workflow['spec']['pipelineSpec'].setdefault('finally', []):
|
||||
del workflow['spec']['pipelineSpec']['finally']
|
||||
|
||||
|
||||
def load_annotations(template: dict):
|
||||
artifact_items = json.loads(
|
||||
str(template['metadata']['annotations']['tekton.dev/artifact_items']))
|
||||
template['metadata']['annotations']['tekton.dev/artifact_items'] = \
|
||||
artifact_items
|
||||
return template
|
||||
|
||||
|
||||
def jsonify_annotations(template: dict):
|
||||
template['metadata']['annotations']['tekton.dev/artifact_items'] = \
|
||||
json.dumps(template['metadata']['annotations']['tekton.dev/artifact_items'])
|
||||
return template
|
||||
|
|
|
|||
|
|
@ -131,7 +131,14 @@ class TestTektonCompiler(unittest.TestCase):
|
|||
Test compiling a pipeline_param_as_file workflow.
|
||||
"""
|
||||
from .testdata.data_passing_pipeline_param_as_file import data_passing_pipeline
|
||||
self._test_pipeline_workflow(data_passing_pipeline, 'data_passing_pipeline_param_as_file.yaml')
|
||||
self._test_pipeline_workflow(data_passing_pipeline, 'data_passing_pipeline_param_as_file.yaml', skip_noninlined=True)
|
||||
|
||||
def test_data_passing_pipeline_complete(self):
|
||||
"""
|
||||
Test compiling a pipeline_param_as_file workflow.
|
||||
"""
|
||||
from .testdata.data_passing_pipeline_complete import data_passing_pipeline
|
||||
self._test_pipeline_workflow(data_passing_pipeline, 'data_passing_pipeline_complete.yaml', skip_noninlined=True)
|
||||
|
||||
def test_recur_nested_workflow(self):
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ metadata:
|
|||
tekton.dev/artifact_bucket: mlpipeline
|
||||
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
|
||||
tekton.dev/artifact_endpoint_scheme: http://
|
||||
tekton.dev/artifact_items: '{"gen-params": [["Output", "$(workspaces.sum-numbers.path)/sum-numbers-Output"]],
|
||||
tekton.dev/artifact_items: '{"gen-params": [["Output", "$(results.output.path)"]],
|
||||
"print-params": [], "print-text": [], "print-text-2": [], "print-text-3": [],
|
||||
"print-text-4": [], "print-text-5": [], "repeat-line": [["output_text", "$(workspaces.repeat-line.path)/repeat-line-output_text"]],
|
||||
"split-text-lines": [["even_lines", "$(workspaces.split-text-lines.path)/split-text-lines-even_lines"],
|
||||
|
|
@ -89,6 +89,22 @@ spec:
|
|||
|
||||
_outputs = repeat_line(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.repeat-line.path)/repeat-line-output_text | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output-text.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.repeat-line.path)/repeat-line-output_text $(results.output-text.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output-text
|
||||
description: /tmp/outputs/output_text/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -244,6 +260,30 @@ spec:
|
|||
|
||||
_outputs = split_text_lines(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/split-text-lines-odd_lines | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.odd-lines.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.split-text-lines.path)/split-text-lines-odd_lines $(results.odd-lines.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/split-text-lines-even_lines | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.even-lines.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.split-text-lines.path)/split-text-lines-even_lines $(results.even-lines.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: odd-lines
|
||||
description: /tmp/outputs/odd_lines/data
|
||||
- name: even-lines
|
||||
description: /tmp/outputs/even_lines/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -439,6 +479,22 @@ spec:
|
|||
|
||||
_outputs = write_numbers(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.write-numbers.path)/write-numbers-numbers | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.numbers.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.write-numbers.path)/write-numbers-numbers $(results.numbers.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: numbers
|
||||
description: /tmp/outputs/numbers/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -586,6 +642,22 @@ spec:
|
|||
with open(output_file, 'w') as f:
|
||||
f.write(_output_serializers[idx](_outputs[idx]))
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.sum-numbers.path)/sum-numbers-Output | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.sum-numbers.path)/sum-numbers-Output $(results.output.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output
|
||||
description: /tmp/outputs/Output/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ metadata:
|
|||
tekton.dev/artifact_bucket: mlpipeline
|
||||
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
|
||||
tekton.dev/artifact_endpoint_scheme: http://
|
||||
tekton.dev/artifact_items: '{"gen-params": [["Output", "$(workspaces.sum-numbers.path)/sum-numbers-Output"]],
|
||||
tekton.dev/artifact_items: '{"gen-params": [["Output", "$(results.output.path)"]],
|
||||
"print-params": [], "print-text": [], "print-text-2": [], "print-text-3": [],
|
||||
"print-text-4": [], "print-text-5": [], "repeat-line": [["output_text", "$(workspaces.repeat-line.path)/repeat-line-output_text"]],
|
||||
"split-text-lines": [["even_lines", "$(workspaces.split-text-lines.path)/split-text-lines-even_lines"],
|
||||
|
|
@ -89,6 +89,22 @@ spec:
|
|||
|
||||
_outputs = repeat_line(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.repeat-line.path)/repeat-line-output_text | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output-text.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.repeat-line.path)/repeat-line-output_text $(results.output-text.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output-text
|
||||
description: /tmp/outputs/output_text/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -244,6 +260,30 @@ spec:
|
|||
|
||||
_outputs = split_text_lines(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/split-text-lines-odd_lines | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.odd-lines.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.split-text-lines.path)/split-text-lines-odd_lines $(results.odd-lines.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.split-text-lines.path)/split-text-lines-even_lines | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.even-lines.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.split-text-lines.path)/split-text-lines-even_lines $(results.even-lines.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: odd-lines
|
||||
description: /tmp/outputs/odd_lines/data
|
||||
- name: even-lines
|
||||
description: /tmp/outputs/even_lines/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -439,6 +479,22 @@ spec:
|
|||
|
||||
_outputs = write_numbers(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.write-numbers.path)/write-numbers-numbers | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.numbers.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.write-numbers.path)/write-numbers-numbers $(results.numbers.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: numbers
|
||||
description: /tmp/outputs/numbers/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -586,6 +642,22 @@ spec:
|
|||
with open(output_file, 'w') as f:
|
||||
f.write(_output_serializers[idx](_outputs[idx]))
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.sum-numbers.path)/sum-numbers-Output | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.sum-numbers.path)/sum-numbers-Output $(results.output.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output
|
||||
description: /tmp/outputs/Output/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
|
|||
|
|
@ -79,6 +79,22 @@ spec:
|
|||
|
||||
_outputs = produce_dir_with_files_python_op(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output-dir.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir $(results.output-dir.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output-dir
|
||||
description: /tmp/outputs/output_dir/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -127,6 +143,22 @@ spec:
|
|||
- subdir
|
||||
- $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir
|
||||
image: alpine
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.subdir.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir $(results.subdir.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: subdir
|
||||
description: /tmp/outputs/Subdir/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
|
|||
|
|
@ -79,6 +79,22 @@ spec:
|
|||
|
||||
_outputs = produce_dir_with_files_python_op(**_parsed_args)
|
||||
image: python:3.7
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.output-dir.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.produce-dir-with-files-python-op.path)/produce-dir-with-files-python-op-output_dir $(results.output-dir.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: output-dir
|
||||
description: /tmp/outputs/output_dir/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
@ -127,6 +143,22 @@ spec:
|
|||
- subdir
|
||||
- $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir
|
||||
image: alpine
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.subdir.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp $(workspaces.get-subdirectory.path)/get-subdirectory-Subdir $(results.subdir.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: subdir
|
||||
description: /tmp/outputs/Subdir/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
|
|
|
|||
|
|
@ -13,7 +13,6 @@
|
|||
# limitations under the License.
|
||||
|
||||
from kfp import dsl, components
|
||||
from tests.compiler.testdata.custom_task_ref import CUSTOM_STR
|
||||
|
||||
MY_CUSTOM_TASK_IMAGE_NAME = "veryunique/image:latest"
|
||||
from kfp_tekton.tekton import TEKTON_CUSTOM_TASK_IMAGES
|
||||
|
|
|
|||
|
|
@ -0,0 +1,168 @@
|
|||
# Copyright 2020 kubeflow.org
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# This pipeline demonstrates and verifies all ways of data passing supported by KFP.
|
||||
|
||||
# %% [markdown]
|
||||
# KFP has simple data passing model.
|
||||
# There are three different kinds of arguments and tw different ways to consume an input argument.
|
||||
# All combinations are supported.
|
||||
|
||||
# Any input can be consumed:
|
||||
# * As value (`inputValue` placeholder)
|
||||
# * As file (`inputPath` placeholder).
|
||||
|
||||
# Input argument can come from:
|
||||
# * Constant value
|
||||
# * Pipeline parameter
|
||||
# * Upstream component output
|
||||
|
||||
# Combining these options there are 6 end-to-end data passing cases, each of which works regardless of type:
|
||||
|
||||
# 1. Pass constant value which gets consumed as value. (Common)
|
||||
# 2. Pass constant value which gets consumed as file. (Often used in test pipelines)
|
||||
# 3. Pass pipeline parameter that gets consumed as value. (Common)
|
||||
# 4. Pass pipeline parameter that gets consumed as file. (Rare. Sometimes used with JSON parameters)
|
||||
# 5. Pass task output that gets consumed as value. (Common)
|
||||
# 6. Pass task output that gets consumed as file. (Common)
|
||||
|
||||
# The only restriction on types is that when both upstream output and downstream input have types, the types must match.
|
||||
|
||||
# %%
|
||||
|
||||
import kfp
|
||||
from kfp.components import create_component_from_func, InputPath, OutputPath
|
||||
from kfp_tekton.compiler import TektonCompiler
|
||||
|
||||
|
||||
# Components
|
||||
# Produce
|
||||
@create_component_from_func
|
||||
def produce_anything(data_path: OutputPath()):
|
||||
with open(data_path, "w") as f:
|
||||
f.write("produce_anything")
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_something(data_path: OutputPath("Something")):
|
||||
with open(data_path, "w") as f:
|
||||
f.write("produce_something")
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_something2() -> 'Something':
|
||||
return "produce_something2"
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def produce_string() -> str:
|
||||
return "produce_string"
|
||||
|
||||
|
||||
# Consume as value
|
||||
@create_component_from_func
|
||||
def consume_anything_as_value(data):
|
||||
print("consume_anything_as_value: " + data)
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_something_as_value(data: "Something"):
|
||||
print("consume_something_as_value: " + data)
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_string_as_value(data: str):
|
||||
print("consume_string_as_value: " + data)
|
||||
|
||||
|
||||
# Consume as file
|
||||
@create_component_from_func
|
||||
def consume_anything_as_file(data_path: InputPath()):
|
||||
with open(data_path) as f:
|
||||
print("consume_anything_as_file: " + f.read())
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_something_as_file(data_path: InputPath('Something')):
|
||||
with open(data_path) as f:
|
||||
print("consume_something_as_file: " + f.read())
|
||||
|
||||
|
||||
@create_component_from_func
|
||||
def consume_string_as_file(data_path: InputPath(str)):
|
||||
with open(data_path) as f:
|
||||
print("consume_string_as_file: " + f.read())
|
||||
|
||||
|
||||
# Pipeline
|
||||
@kfp.dsl.pipeline(name='data_passing_pipeline')
|
||||
def data_passing_pipeline(
|
||||
anything_param="anything_param",
|
||||
something_param: "Something" = "something_param",
|
||||
string_param: str = "string_param",
|
||||
):
|
||||
produced_anything = produce_anything().output
|
||||
produced_something = produce_something().output
|
||||
produced_string = produce_string().output
|
||||
|
||||
# Pass constant value; consume as value
|
||||
consume_anything_as_value("constant")
|
||||
consume_something_as_value("constant")
|
||||
consume_string_as_value("constant")
|
||||
|
||||
# Pass constant value; consume as file
|
||||
consume_anything_as_file("constant")
|
||||
consume_something_as_file("constant")
|
||||
consume_string_as_file("constant")
|
||||
|
||||
# Pass pipeline parameter; consume as value
|
||||
consume_anything_as_value(anything_param)
|
||||
consume_anything_as_value(something_param)
|
||||
consume_anything_as_value(string_param)
|
||||
consume_something_as_value(anything_param)
|
||||
consume_something_as_value(something_param)
|
||||
consume_string_as_value(anything_param)
|
||||
consume_string_as_value(string_param)
|
||||
|
||||
# Pass pipeline parameter; consume as file
|
||||
consume_anything_as_file(anything_param)
|
||||
consume_anything_as_file(something_param)
|
||||
consume_anything_as_file(string_param)
|
||||
consume_something_as_file(anything_param)
|
||||
consume_something_as_file(something_param)
|
||||
consume_string_as_file(anything_param)
|
||||
consume_string_as_file(string_param)
|
||||
|
||||
# Pass task output; consume as value
|
||||
consume_anything_as_value(produced_anything)
|
||||
consume_anything_as_value(produced_something)
|
||||
consume_anything_as_value(produced_string)
|
||||
consume_something_as_value(produced_anything)
|
||||
consume_something_as_value(produced_something)
|
||||
consume_string_as_value(produced_anything)
|
||||
consume_string_as_value(produced_string)
|
||||
|
||||
# Pass task output; consume as file
|
||||
consume_anything_as_file(produced_anything)
|
||||
consume_anything_as_file(produced_something)
|
||||
consume_anything_as_file(produced_string)
|
||||
consume_something_as_file(produced_anything)
|
||||
consume_something_as_file(produced_something)
|
||||
consume_string_as_file(produced_anything)
|
||||
consume_string_as_file(produced_string)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
kfp_endpoint = None
|
||||
TektonCompiler().compile(data_passing_pipeline, __file__.replace('.py', '.yaml'))
|
||||
File diff suppressed because it is too large
Load Diff
|
|
@ -1,454 +0,0 @@
|
|||
# Copyright 2021 kubeflow.org
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
apiVersion: tekton.dev/v1beta1
|
||||
kind: PipelineRun
|
||||
metadata:
|
||||
name: data-passing-pipeline
|
||||
annotations:
|
||||
tekton.dev/output_artifacts: '{}'
|
||||
tekton.dev/input_artifacts: '{}'
|
||||
tekton.dev/artifact_bucket: mlpipeline
|
||||
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
|
||||
tekton.dev/artifact_endpoint_scheme: http://
|
||||
tekton.dev/artifact_items: '{"consume-anything-as-file": [], "consume-anything-as-file-2":
|
||||
[], "consume-anything-as-file-3": [], "consume-something-as-file": [], "consume-something-as-file-2":
|
||||
[], "consume-string-as-file": [], "consume-string-as-file-2": []}'
|
||||
sidecar.istio.io/inject: "false"
|
||||
pipelines.kubeflow.org/pipeline_spec: '{"inputs": [{"default": "anything_param",
|
||||
"name": "anything_param", "optional": true}, {"default": "something_param",
|
||||
"name": "something_param", "optional": true, "type": "Something"}, {"default":
|
||||
"string_param", "name": "string_param", "optional": true, "type": "String"}],
|
||||
"name": "data_passing_pipeline"}'
|
||||
spec:
|
||||
params:
|
||||
- name: anything_param
|
||||
value: anything_param
|
||||
- name: something_param
|
||||
value: something_param
|
||||
- name: string_param
|
||||
value: string_param
|
||||
pipelineSpec:
|
||||
params:
|
||||
- name: anything_param
|
||||
default: anything_param
|
||||
- name: something_param
|
||||
default: something_param
|
||||
- name: string_param
|
||||
default: string_param
|
||||
tasks:
|
||||
- name: consume-anything-as-file
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "anything_param" > $(workspaces.consume-anything-as-file.path)/anything_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-anything-as-file.path)/anything_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_anything_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_anything_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume anything as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_anything_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_anything_as_file: \"
|
||||
+ f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
anything as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume
|
||||
anything as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-anything-as-file
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-anything-as-file
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-anything-as-file-2
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "something_param" > $(workspaces.consume-anything-as-file-2.path)/something_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-anything-as-file-2.path)/something_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_anything_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_anything_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume anything as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_anything_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_anything_as_file: \"
|
||||
+ f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
anything as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume
|
||||
anything as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-anything-as-file-2
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-anything-as-file-2
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-anything-as-file-3
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "string_param" > $(workspaces.consume-anything-as-file-3.path)/string_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-anything-as-file-3.path)/string_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_anything_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_anything_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume anything as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_anything_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_anything_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_anything_as_file: \"
|
||||
+ f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
anything as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_anything_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data"}], "name": "Consume
|
||||
anything as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-anything-as-file-3
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-anything-as-file-3
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-something-as-file
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "anything_param" > $(workspaces.consume-something-as-file.path)/anything_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-something-as-file.path)/anything_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_something_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_something_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume something as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_something_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_something_as_file: \"
|
||||
+ f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
something as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}],
|
||||
"name": "Consume something as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-something-as-file
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-something-as-file
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-something-as-file-2
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "something_param" > $(workspaces.consume-something-as-file-2.path)/something_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-something-as-file-2.path)/something_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_something_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_something_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume something as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_something_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_something_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_something_as_file: \"
|
||||
+ f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
something as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_something_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data", "type": "Something"}],
|
||||
"name": "Consume something as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-something-as-file-2
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-something-as-file-2
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-string-as-file
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "anything_param" > $(workspaces.consume-string-as-file.path)/anything_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-string-as-file.path)/anything_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_string_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_string_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume string as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_string_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_string_as_file: \" +
|
||||
f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
string as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}],
|
||||
"name": "Consume string as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-string-as-file
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-string-as-file
|
||||
workspace: data-passing-pipeline
|
||||
- name: consume-string-as-file-2
|
||||
taskSpec:
|
||||
steps:
|
||||
- image: busybox
|
||||
name: copy-inputs
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
echo -n "string_param" > $(workspaces.consume-string-as-file-2.path)/string_param
|
||||
- name: main
|
||||
args:
|
||||
- --data
|
||||
- $(workspaces.consume-string-as-file-2.path)/string_param
|
||||
command:
|
||||
- sh
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def consume_string_as_file(data_path):
|
||||
with open(data_path) as f:
|
||||
print("consume_string_as_file: " + f.read())
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Consume string as file', description='')
|
||||
_parser.add_argument("--data", dest="data_path", type=str, required=True, default=argparse.SUPPRESS)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
|
||||
_outputs = consume_string_as_file(**_parsed_args)
|
||||
image: python:3.7
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"implementation": {"container":
|
||||
{"args": ["--data", {"inputPath": "data"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def consume_string_as_file(data_path):\n with
|
||||
open(data_path) as f:\n print(\"consume_string_as_file: \" +
|
||||
f.read())\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Consume
|
||||
string as file'', description='''')\n_parser.add_argument(\"--data\",
|
||||
dest=\"data_path\", type=str, required=True, default=argparse.SUPPRESS)\n_parsed_args
|
||||
= vars(_parser.parse_args())\n\n_outputs = consume_string_as_file(**_parsed_args)\n"],
|
||||
"image": "python:3.7"}}, "inputs": [{"name": "data", "type": "String"}],
|
||||
"name": "Consume string as file"}'
|
||||
tekton.dev/template: ''
|
||||
workspaces:
|
||||
- name: consume-string-as-file-2
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: consume-string-as-file-2
|
||||
workspace: data-passing-pipeline
|
||||
workspaces:
|
||||
- name: data-passing-pipeline
|
||||
timeout: 0s
|
||||
workspaces:
|
||||
- name: data-passing-pipeline
|
||||
volumeClaimTemplate:
|
||||
spec:
|
||||
accessModes:
|
||||
- ReadWriteMany
|
||||
resources:
|
||||
requests:
|
||||
storage: 2Gi
|
||||
|
|
@ -107,6 +107,25 @@ spec:
|
|||
- sh
|
||||
- -c
|
||||
image: library/bash
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param1 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param1.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param1 $(results.param1.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param3-b | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param3-b.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param3-b $(results.param3-b.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: param1
|
||||
description: /tmp/outputs/param1/data
|
||||
|
|
|
|||
|
|
@ -107,6 +107,25 @@ spec:
|
|||
- sh
|
||||
- -c
|
||||
image: library/bash
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param1 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param1.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param1 $(results.param1.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param3-b | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param3-b.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param3-b $(results.param3-b.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: param1
|
||||
description: /tmp/outputs/param1/data
|
||||
|
|
|
|||
|
|
@ -107,6 +107,31 @@ spec:
|
|||
- sh
|
||||
- -c
|
||||
image: library/bash
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param2 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param2.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param2 $(results.param2.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param3 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param3.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param3 $(results.param3.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/superlongnamesuperlongnamesuperlongnamesuperlongnamesuper | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.superlongnamesuperlongnamesuperlongnamesuperlongnamesuper.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/superlongnamesuperlongnamesuperlongnamesuperlongnamesuper $(results.superlongnamesuperlongnamesuperlongnamesuperlongnamesuper.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: param1
|
||||
description: /tmp/outputs/param1/data
|
||||
|
|
|
|||
|
|
@ -107,6 +107,31 @@ spec:
|
|||
- sh
|
||||
- -c
|
||||
image: library/bash
|
||||
- image: busybox
|
||||
name: copy-results-artifacts
|
||||
script: |
|
||||
#!/bin/sh
|
||||
set -exo pipefail
|
||||
TOTAL_SIZE=0
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param2 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param2.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param2 $(results.param2.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/param3 | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.param3.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/param3 $(results.param3.path)
|
||||
fi
|
||||
ARTIFACT_SIZE=`wc -c /tekton/home/tep-results/superlongnamesuperlongnamesuperlongnamesuperlongnamesuper | awk '{print $1}'`
|
||||
TOTAL_SIZE=$( expr $TOTAL_SIZE + $ARTIFACT_SIZE)
|
||||
touch $(results.superlongnamesuperlongnamesuperlongnamesuperlongnamesuper.path)
|
||||
if [[ $TOTAL_SIZE -lt 3072 ]]; then
|
||||
cp /tekton/home/tep-results/superlongnamesuperlongnamesuperlongnamesuperlongnamesuper $(results.superlongnamesuperlongnamesuperlongnamesuperlongnamesuper.path)
|
||||
fi
|
||||
onError: continue
|
||||
results:
|
||||
- name: param1
|
||||
description: /tmp/outputs/param1/data
|
||||
|
|
|
|||
|
|
@ -191,6 +191,7 @@ func EnableCustomTaskFeatureFlag(ctx context.Context) context.Context {
|
|||
defaults, _ := config.NewDefaultsFromMap(map[string]string{})
|
||||
featureFlags, _ := config.NewFeatureFlagsFromMap(map[string]string{
|
||||
"enable-custom-tasks": "true",
|
||||
"enable-api-fields" : "alpha",
|
||||
})
|
||||
artifactBucket, _ := config.NewArtifactBucketFromMap(map[string]string{})
|
||||
artifactPVC, _ := config.NewArtifactPVCFromMap(map[string]string{})
|
||||
|
|
|
|||
Loading…
Reference in New Issue