Change to use component to replace containerOp for any sequencer (#675)
* Change to use component to replace containerOp for any sequencer * Remove context input and change results path * 1: Update to use component rather than containerOp 2: Fix lint/license issue
This commit is contained in:
parent
afc8460ef1
commit
18c9cc8bcc
|
|
@ -15,7 +15,6 @@
|
|||
from typing import List, Iterable, Union
|
||||
from kfp import dsl
|
||||
from kfp import components
|
||||
from kfp.dsl._container_op import ContainerOp
|
||||
from kfp.dsl._pipeline_param import ConditionOperator
|
||||
from kfp_tekton.compiler._k8s_helper import sanitize_k8s_name
|
||||
|
||||
|
|
@ -26,7 +25,10 @@ DEFAULT_CONDITION_OUTPUT_KEYWORD = "outcome"
|
|||
TEKTON_CUSTOM_TASK_IMAGES = [CEL_EVAL_IMAGE]
|
||||
|
||||
|
||||
class AnySequencer(ContainerOp):
|
||||
def AnySequencer(any: Iterable[Union[dsl.ContainerOp, ConditionOperator]],
|
||||
name: str = None, statusPath: str = None,
|
||||
skippingPolicy: str = None, errorPolicy: str = None,
|
||||
image: str = ANY_SEQUENCER_IMAGE):
|
||||
"""A containerOp that will proceed when any of the dependent containerOps completed
|
||||
successfully
|
||||
|
||||
|
|
@ -36,50 +38,65 @@ class AnySequencer(ContainerOp):
|
|||
|
||||
any: List of `Conditional` containerOps that deploy together with the `main`
|
||||
containerOp, or the condtion that must meet to continue.
|
||||
|
||||
statusPath: The location to write the output stauts
|
||||
|
||||
skippingPolicy: Determines for the Any Sequencer reacts to
|
||||
no-dependency-condition-matching case. Values can be one of `skipOnNoMatch`
|
||||
or `errorOnNoMatch`, a status with value "Skipped" will be generated and the
|
||||
exit status will still be succeeded on `skipOnNoMatch`.
|
||||
|
||||
errorPolicy: The standard field, either `failOnError` or `continueOnError`. On
|
||||
`continueOnError`, a status with value "Failed" will be generated
|
||||
but the exit status will still be succeeded. For `Fail_on_error` the
|
||||
Any Sequencer should truly fail in the Tekton terms, as it does now.
|
||||
|
||||
image: The image to implement the any sequencer logic. Default to dspipelines/any-sequencer:latest.
|
||||
"""
|
||||
def __init__(self,
|
||||
any: Iterable[Union[dsl.ContainerOp, ConditionOperator]],
|
||||
name: str = None, statusPath: str = None,
|
||||
skippingPolicy: str = None, errorPolicy: str = None,
|
||||
image: str = ANY_SEQUENCER_IMAGE):
|
||||
arguments = [
|
||||
"--namespace",
|
||||
"$(context.pipelineRun.namespace)",
|
||||
"--prName",
|
||||
"$(context.pipelineRun.name)"
|
||||
]
|
||||
tasks_list = []
|
||||
condition_list = []
|
||||
file_outputs = None
|
||||
for cop in any:
|
||||
if isinstance(cop, dsl.ContainerOp):
|
||||
cop_name = sanitize_k8s_name(cop.name)
|
||||
tasks_list.append(cop_name)
|
||||
elif isinstance(cop, ConditionOperator):
|
||||
condition_list.append(cop)
|
||||
if len(tasks_list) > 0:
|
||||
task_list_str = ",".join(tasks_list)
|
||||
arguments.extend(["--taskList", task_list_str])
|
||||
if statusPath is not None:
|
||||
file_outputs = {"status": statusPath}
|
||||
arguments.extend(["--statusPath", statusPath])
|
||||
if skippingPolicy is not None:
|
||||
assert skippingPolicy == "skipOnNoMatch" or skippingPolicy == "errorOnNoMatch"
|
||||
arguments.extend(["--skippingPolicy", skippingPolicy])
|
||||
if errorPolicy is not None:
|
||||
assert errorPolicy == "continueOnError" or errorPolicy == "failOnError"
|
||||
arguments.extend(["--errorPolicy", errorPolicy])
|
||||
arguments = [
|
||||
"--namespace",
|
||||
"$(context.pipelineRun.namespace)",
|
||||
"--prName",
|
||||
"$(context.pipelineRun.name)"
|
||||
]
|
||||
tasks_list = []
|
||||
condition_list = []
|
||||
file_outputs = None
|
||||
for cop in any:
|
||||
if isinstance(cop, dsl.ContainerOp):
|
||||
cop_name = sanitize_k8s_name(cop.name)
|
||||
tasks_list.append(cop_name)
|
||||
elif isinstance(cop, ConditionOperator):
|
||||
condition_list.append(cop)
|
||||
if len(tasks_list) > 0:
|
||||
task_list_str = "\'" + ",".join(tasks_list) + "\'"
|
||||
arguments.extend(["--taskList", task_list_str])
|
||||
if statusPath is not None:
|
||||
file_outputs = '{outputPath: %s}' % statusPath
|
||||
arguments.extend(["--statusPath", file_outputs])
|
||||
if skippingPolicy is not None:
|
||||
assert skippingPolicy == "skipOnNoMatch" or skippingPolicy == "errorOnNoMatch"
|
||||
arguments.extend(["--skippingPolicy", skippingPolicy])
|
||||
if errorPolicy is not None:
|
||||
assert errorPolicy == "continueOnError" or errorPolicy == "failOnError"
|
||||
arguments.extend(["--errorPolicy", errorPolicy])
|
||||
conditonArgs = processConditionArgs(condition_list)
|
||||
arguments.extend(conditonArgs)
|
||||
|
||||
conditonArgs = processConditionArgs(condition_list)
|
||||
arguments.extend(conditonArgs)
|
||||
|
||||
super().__init__(
|
||||
name=name,
|
||||
image=image,
|
||||
file_outputs=file_outputs,
|
||||
command="any-task",
|
||||
arguments=arguments,
|
||||
)
|
||||
AnyOp_yaml = '''\
|
||||
name: %s
|
||||
description: 'Proceed when any of the dependents completed successfully'
|
||||
outputs:
|
||||
- {name: %s, description: 'The output file to create the status'}
|
||||
implementation:
|
||||
container:
|
||||
image: %s
|
||||
command: [any-task]
|
||||
args: [%s]
|
||||
''' % (name, statusPath, image, ",".join(arguments))
|
||||
AnyOp_template = components.load_component_from_text(AnyOp_yaml)
|
||||
AnyOp = AnyOp_template()
|
||||
return AnyOp
|
||||
|
||||
|
||||
def processOperand(operand) -> (str, str):
|
||||
|
|
|
|||
|
|
@ -13,20 +13,38 @@
|
|||
# limitations under the License.
|
||||
|
||||
from kfp import dsl
|
||||
from kfp import components
|
||||
from kfp_tekton.compiler import TektonCompiler
|
||||
from kfp_tekton.tekton import after_any
|
||||
|
||||
|
||||
def flip_coin_op():
|
||||
def flip_coin() -> str:
|
||||
"""Flip a coin and output heads or tails randomly."""
|
||||
return dsl.ContainerOp(
|
||||
name='flipCoin',
|
||||
image='python:alpine3.6',
|
||||
command=['sh', '-c'],
|
||||
arguments=['python -c "import random; result = \'heads\' if random.randint(0,1) == 0 '
|
||||
'else \'tails\'; print(result)" | tee /tmp/output'],
|
||||
file_outputs={'output': '/tmp/output'}
|
||||
)
|
||||
import random
|
||||
result = 'heads' if random.randint(0, 1) == 0 else 'tails'
|
||||
print(result)
|
||||
return result
|
||||
|
||||
|
||||
flip_coin_op = components.create_component_from_func(
|
||||
flip_coin, base_image='python:alpine3.6')
|
||||
|
||||
|
||||
component_text = '''\
|
||||
name: 'sleepComponent'
|
||||
description: |
|
||||
Component for sleep
|
||||
inputs:
|
||||
- {name: seconds, description: 'Sleep for some seconds', default: 10, type: int}
|
||||
implementation:
|
||||
container:
|
||||
image: alpine:latest
|
||||
command: ['sleep']
|
||||
args: [
|
||||
{inputValue: seconds},
|
||||
]
|
||||
'''
|
||||
sleepOp_template = components.load_component_from_text(component_text)
|
||||
|
||||
|
||||
@dsl.pipeline(
|
||||
|
|
@ -35,37 +53,19 @@ def flip_coin_op():
|
|||
)
|
||||
def any_sequence_pipeline(
|
||||
):
|
||||
task1 = dsl.ContainerOp(
|
||||
name="task1",
|
||||
image="registry.access.redhat.com/ubi8/ubi-minimal",
|
||||
command=["/bin/bash", "-c"],
|
||||
arguments=["sleep 15"]
|
||||
)
|
||||
task1 = sleepOp_template(15)
|
||||
|
||||
task2 = dsl.ContainerOp(
|
||||
name="task2",
|
||||
image="registry.access.redhat.com/ubi8/ubi-minimal",
|
||||
command=["/bin/bash", "-c"],
|
||||
arguments=["sleep 200"]
|
||||
)
|
||||
task2 = sleepOp_template(200)
|
||||
|
||||
task3 = dsl.ContainerOp(
|
||||
name="task3",
|
||||
image="registry.access.redhat.com/ubi8/ubi-minimal",
|
||||
command=["/bin/bash", "-c"],
|
||||
arguments=["sleep 300"]
|
||||
)
|
||||
task3 = sleepOp_template(300)
|
||||
|
||||
flip_out = flip_coin_op()
|
||||
|
||||
flip_out.after(task1)
|
||||
|
||||
dsl.ContainerOp(
|
||||
name="task4",
|
||||
image="registry.access.redhat.com/ubi8/ubi-minimal",
|
||||
command=["/bin/bash", "-c"],
|
||||
arguments=["sleep 30"]
|
||||
).apply(after_any([task2, task3, flip_out.outputs['output'] == "heads"], "any_test", '/tekton/results/status'))
|
||||
task4 = sleepOp_template(30)
|
||||
|
||||
task4.apply(after_any([task2, task3, flip_out.outputs['output'] == "heads"], "any_test", 'status'))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
|
|
|||
|
|
@ -18,116 +18,191 @@ metadata:
|
|||
name: any-sequencer
|
||||
annotations:
|
||||
tekton.dev/output_artifacts: '{"any-test": [{"key": "artifacts/$PIPELINERUN/any-test/status.tgz",
|
||||
"name": "any-test-status", "path": "/tekton/results/status"}], "flipcoin": [{"key":
|
||||
"artifacts/$PIPELINERUN/flipcoin/output.tgz", "name": "flipcoin-output", "path":
|
||||
"/tmp/output"}]}'
|
||||
"name": "any-test-status", "path": "/tmp/outputs/status/data"}], "flip-coin":
|
||||
[{"key": "artifacts/$PIPELINERUN/flip-coin/Output.tgz", "name": "flip-coin-Output",
|
||||
"path": "/tmp/outputs/Output/data"}]}'
|
||||
tekton.dev/input_artifacts: '{}'
|
||||
tekton.dev/artifact_bucket: mlpipeline
|
||||
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
|
||||
tekton.dev/artifact_endpoint_scheme: http://
|
||||
tekton.dev/artifact_items: '{"any-test": [["status", "$(results.status.path)"]],
|
||||
"flipcoin": [["output", "$(results.output.path)"]], "task1": [], "task2": [],
|
||||
"task3": [], "task4": []}'
|
||||
"flip-coin": [["Output", "$(results.output.path)"]], "sleepcomponent": [], "sleepcomponent-2":
|
||||
[], "sleepcomponent-3": [], "sleepcomponent-4": []}'
|
||||
sidecar.istio.io/inject: "false"
|
||||
pipelines.kubeflow.org/pipeline_spec: '{"description": "Any Sequencer Component
|
||||
Demo", "name": "Any Sequencer"}'
|
||||
spec:
|
||||
pipelineSpec:
|
||||
tasks:
|
||||
- name: task1
|
||||
- name: sleepcomponent
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 15
|
||||
- '15'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: task2
|
||||
- name: sleepcomponent-2
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 200
|
||||
- '200'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: task3
|
||||
- name: sleepcomponent-3
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 300
|
||||
- '300'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: flipcoin
|
||||
- name: flip-coin
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
|
||||
else 'tails'; print(result)" | tee $(results.output.path)
|
||||
- '----output-paths'
|
||||
- $(results.output.path)
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def flip_coin():
|
||||
"""Flip a coin and output heads or tails randomly."""
|
||||
import random
|
||||
result = 'heads' if random.randint(0, 1) == 0 else 'tails'
|
||||
print(result)
|
||||
return result
|
||||
|
||||
def _serialize_str(str_value: str) -> str:
|
||||
if not isinstance(str_value, str):
|
||||
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
|
||||
return str_value
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Flip coin', description='Flip a coin and output heads or tails randomly.')
|
||||
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
_output_files = _parsed_args.pop("_output_paths", [])
|
||||
|
||||
_outputs = flip_coin(**_parsed_args)
|
||||
|
||||
_outputs = [_outputs]
|
||||
|
||||
_output_serializers = [
|
||||
_serialize_str,
|
||||
|
||||
]
|
||||
|
||||
import os
|
||||
for idx, output_file in enumerate(_output_files):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(output_file))
|
||||
except OSError:
|
||||
pass
|
||||
with open(output_file, 'w') as f:
|
||||
f.write(_output_serializers[idx](_outputs[idx]))
|
||||
image: python:alpine3.6
|
||||
results:
|
||||
- name: output
|
||||
description: /tmp/output
|
||||
description: /tmp/outputs/Output/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Flip a coin and
|
||||
output heads or tails randomly.", "implementation": {"container": {"args":
|
||||
["----output-paths", {"outputPath": "Output"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def flip_coin():\n \"\"\"Flip a
|
||||
coin and output heads or tails randomly.\"\"\"\n import random\n result
|
||||
= ''heads'' if random.randint(0, 1) == 0 else ''tails''\n print(result)\n return
|
||||
result\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value,
|
||||
str):\n raise TypeError(''Value \"{}\" has type \"{}\" instead
|
||||
of str.''.format(str(str_value), str(type(str_value))))\n return
|
||||
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Flip
|
||||
coin'', description=''Flip a coin and output heads or tails randomly.'')\n_parser.add_argument(\"----output-paths\",
|
||||
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
|
||||
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = flip_coin(**_parsed_args)\n\n_outputs
|
||||
= [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport
|
||||
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
|
||||
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
|
||||
"image": "python:alpine3.6"}}, "name": "Flip coin", "outputs": [{"name":
|
||||
"Output", "type": "String"}]}'
|
||||
tekton.dev/template: ''
|
||||
runAfter:
|
||||
- task1
|
||||
- sleepcomponent
|
||||
timeout: 0s
|
||||
- name: task4
|
||||
- name: sleepcomponent-4
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 30
|
||||
- '30'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
runAfter:
|
||||
- any-test
|
||||
|
|
@ -142,23 +217,31 @@ spec:
|
|||
- --prName
|
||||
- $(params.pipelineRun-name)
|
||||
- --taskList
|
||||
- task2,task3
|
||||
- sleepcomponent-2,sleepcomponent-3
|
||||
- --statusPath
|
||||
- $(results.status.path)
|
||||
- -c
|
||||
- results_flipcoin_output == 'heads'
|
||||
- results_flip-coin_output == 'heads'
|
||||
command:
|
||||
- any-task
|
||||
image: dspipelines/any-sequencer:latest
|
||||
results:
|
||||
- name: status
|
||||
description: /tekton/results/status
|
||||
description: /tmp/outputs/status/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Proceed when
|
||||
any of the dependents completed successfully", "implementation": {"container":
|
||||
{"args": ["--namespace", "$(context.pipelineRun.namespace)", "--prName",
|
||||
"$(context.pipelineRun.name)", "--taskList", "sleepcomponent-2,sleepcomponent-3",
|
||||
"--statusPath", {"outputPath": "status"}, "-c", "results_flip-coin_output
|
||||
== ''heads''"], "command": ["any-task"], "image": "dspipelines/any-sequencer:latest"}},
|
||||
"name": "any_test", "outputs": [{"description": "The output file to
|
||||
create the status", "name": "status"}]}'
|
||||
tekton.dev/template: ''
|
||||
params:
|
||||
- name: pipelineRun-name
|
||||
|
|
|
|||
|
|
@ -18,116 +18,191 @@ metadata:
|
|||
name: any-sequencer
|
||||
annotations:
|
||||
tekton.dev/output_artifacts: '{"any-test": [{"key": "artifacts/$PIPELINERUN/any-test/status.tgz",
|
||||
"name": "any-test-status", "path": "/tekton/results/status"}], "flipcoin": [{"key":
|
||||
"artifacts/$PIPELINERUN/flipcoin/output.tgz", "name": "flipcoin-output", "path":
|
||||
"/tmp/output"}]}'
|
||||
"name": "any-test-status", "path": "/tmp/outputs/status/data"}], "flip-coin":
|
||||
[{"key": "artifacts/$PIPELINERUN/flip-coin/Output.tgz", "name": "flip-coin-Output",
|
||||
"path": "/tmp/outputs/Output/data"}]}'
|
||||
tekton.dev/input_artifacts: '{}'
|
||||
tekton.dev/artifact_bucket: mlpipeline
|
||||
tekton.dev/artifact_endpoint: minio-service.kubeflow:9000
|
||||
tekton.dev/artifact_endpoint_scheme: http://
|
||||
tekton.dev/artifact_items: '{"any-test": [["status", "$(results.status.path)"]],
|
||||
"flipcoin": [["output", "$(results.output.path)"]], "task1": [], "task2": [],
|
||||
"task3": [], "task4": []}'
|
||||
"flip-coin": [["Output", "$(results.output.path)"]], "sleepcomponent": [], "sleepcomponent-2":
|
||||
[], "sleepcomponent-3": [], "sleepcomponent-4": []}'
|
||||
sidecar.istio.io/inject: "false"
|
||||
pipelines.kubeflow.org/pipeline_spec: '{"description": "Any Sequencer Component
|
||||
Demo", "name": "Any Sequencer"}'
|
||||
spec:
|
||||
pipelineSpec:
|
||||
tasks:
|
||||
- name: task1
|
||||
- name: sleepcomponent
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 15
|
||||
- '15'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: task2
|
||||
- name: sleepcomponent-2
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 200
|
||||
- '200'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: task3
|
||||
- name: sleepcomponent-3
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 300
|
||||
- '300'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
timeout: 0s
|
||||
- name: flipcoin
|
||||
- name: flip-coin
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- python -c "import random; result = 'heads' if random.randint(0,1) == 0
|
||||
else 'tails'; print(result)" | tee $(results.output.path)
|
||||
- '----output-paths'
|
||||
- $(results.output.path)
|
||||
command:
|
||||
- sh
|
||||
- -c
|
||||
- -ec
|
||||
- |
|
||||
program_path=$(mktemp)
|
||||
printf "%s" "$0" > "$program_path"
|
||||
python3 -u "$program_path" "$@"
|
||||
- |
|
||||
def flip_coin():
|
||||
"""Flip a coin and output heads or tails randomly."""
|
||||
import random
|
||||
result = 'heads' if random.randint(0, 1) == 0 else 'tails'
|
||||
print(result)
|
||||
return result
|
||||
|
||||
def _serialize_str(str_value: str) -> str:
|
||||
if not isinstance(str_value, str):
|
||||
raise TypeError('Value "{}" has type "{}" instead of str.'.format(str(str_value), str(type(str_value))))
|
||||
return str_value
|
||||
|
||||
import argparse
|
||||
_parser = argparse.ArgumentParser(prog='Flip coin', description='Flip a coin and output heads or tails randomly.')
|
||||
_parser.add_argument("----output-paths", dest="_output_paths", type=str, nargs=1)
|
||||
_parsed_args = vars(_parser.parse_args())
|
||||
_output_files = _parsed_args.pop("_output_paths", [])
|
||||
|
||||
_outputs = flip_coin(**_parsed_args)
|
||||
|
||||
_outputs = [_outputs]
|
||||
|
||||
_output_serializers = [
|
||||
_serialize_str,
|
||||
|
||||
]
|
||||
|
||||
import os
|
||||
for idx, output_file in enumerate(_output_files):
|
||||
try:
|
||||
os.makedirs(os.path.dirname(output_file))
|
||||
except OSError:
|
||||
pass
|
||||
with open(output_file, 'w') as f:
|
||||
f.write(_output_serializers[idx](_outputs[idx]))
|
||||
image: python:alpine3.6
|
||||
results:
|
||||
- name: output
|
||||
description: /tmp/output
|
||||
description: /tmp/outputs/Output/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Flip a coin and
|
||||
output heads or tails randomly.", "implementation": {"container": {"args":
|
||||
["----output-paths", {"outputPath": "Output"}], "command": ["sh", "-ec",
|
||||
"program_path=$(mktemp)\nprintf \"%s\" \"$0\" > \"$program_path\"\npython3
|
||||
-u \"$program_path\" \"$@\"\n", "def flip_coin():\n \"\"\"Flip a
|
||||
coin and output heads or tails randomly.\"\"\"\n import random\n result
|
||||
= ''heads'' if random.randint(0, 1) == 0 else ''tails''\n print(result)\n return
|
||||
result\n\ndef _serialize_str(str_value: str) -> str:\n if not isinstance(str_value,
|
||||
str):\n raise TypeError(''Value \"{}\" has type \"{}\" instead
|
||||
of str.''.format(str(str_value), str(type(str_value))))\n return
|
||||
str_value\n\nimport argparse\n_parser = argparse.ArgumentParser(prog=''Flip
|
||||
coin'', description=''Flip a coin and output heads or tails randomly.'')\n_parser.add_argument(\"----output-paths\",
|
||||
dest=\"_output_paths\", type=str, nargs=1)\n_parsed_args = vars(_parser.parse_args())\n_output_files
|
||||
= _parsed_args.pop(\"_output_paths\", [])\n\n_outputs = flip_coin(**_parsed_args)\n\n_outputs
|
||||
= [_outputs]\n\n_output_serializers = [\n _serialize_str,\n\n]\n\nimport
|
||||
os\nfor idx, output_file in enumerate(_output_files):\n try:\n os.makedirs(os.path.dirname(output_file))\n except
|
||||
OSError:\n pass\n with open(output_file, ''w'') as f:\n f.write(_output_serializers[idx](_outputs[idx]))\n"],
|
||||
"image": "python:alpine3.6"}}, "name": "Flip coin", "outputs": [{"name":
|
||||
"Output", "type": "String"}]}'
|
||||
tekton.dev/template: ''
|
||||
runAfter:
|
||||
- task1
|
||||
- sleepcomponent
|
||||
timeout: 0s
|
||||
- name: task4
|
||||
- name: sleepcomponent-4
|
||||
taskSpec:
|
||||
steps:
|
||||
- name: main
|
||||
args:
|
||||
- sleep 30
|
||||
- '30'
|
||||
command:
|
||||
- /bin/bash
|
||||
- -c
|
||||
image: registry.access.redhat.com/ubi8/ubi-minimal
|
||||
- sleep
|
||||
image: alpine:latest
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Component for
|
||||
sleep\n", "implementation": {"container": {"args": [{"inputValue": "seconds"}],
|
||||
"command": ["sleep"], "image": "alpine:latest"}}, "inputs": [{"default":
|
||||
10, "description": "Sleep for some seconds", "name": "seconds", "type":
|
||||
"int"}], "name": "sleepComponent"}'
|
||||
tekton.dev/template: ''
|
||||
runAfter:
|
||||
- any-test
|
||||
|
|
@ -142,23 +217,31 @@ spec:
|
|||
- --prName
|
||||
- $(params.pipelineRun-name)
|
||||
- --taskList
|
||||
- task2,task3
|
||||
- sleepcomponent-2,sleepcomponent-3
|
||||
- --statusPath
|
||||
- $(results.status.path)
|
||||
- -c
|
||||
- results_flipcoin_output == 'heads'
|
||||
- results_flip-coin_output == 'heads'
|
||||
command:
|
||||
- any-task
|
||||
image: dspipelines/any-sequencer:latest
|
||||
results:
|
||||
- name: status
|
||||
description: /tekton/results/status
|
||||
description: /tmp/outputs/status/data
|
||||
metadata:
|
||||
labels:
|
||||
pipelines.kubeflow.org/pipelinename: ''
|
||||
pipelines.kubeflow.org/generation: ''
|
||||
pipelines.kubeflow.org/cache_enabled: "true"
|
||||
annotations:
|
||||
pipelines.kubeflow.org/component_spec: '{"description": "Proceed when
|
||||
any of the dependents completed successfully", "implementation": {"container":
|
||||
{"args": ["--namespace", "$(context.pipelineRun.namespace)", "--prName",
|
||||
"$(context.pipelineRun.name)", "--taskList", "sleepcomponent-2,sleepcomponent-3",
|
||||
"--statusPath", {"outputPath": "status"}, "-c", "results_flip-coin_output
|
||||
== ''heads''"], "command": ["any-task"], "image": "dspipelines/any-sequencer:latest"}},
|
||||
"name": "any_test", "outputs": [{"description": "The output file to
|
||||
create the status", "name": "status"}]}'
|
||||
tekton.dev/template: ''
|
||||
params:
|
||||
- name: pipelineRun-name
|
||||
|
|
|
|||
Loading…
Reference in New Issue