From 3b3a15e16ac46f082f94f58fba23348510b60ce9 Mon Sep 17 00:00:00 2001 From: qimingj Date: Wed, 23 Jan 2019 17:35:34 -0800 Subject: [PATCH] Add "set_retry()" on ContainerOp. (#723) * Add "set_retry()" on ContainerOp. * Follow up on CR comments. * Update docstring. * Increase retry times for test. * Fix test. --- samples/basic/retry.py | 42 ++++++++ sdk/python/kfp/compiler/compiler.py | 3 + sdk/python/kfp/dsl/_container_op.py | 12 ++- sdk/python/tests/compiler/compiler_tests.py | 4 + sdk/python/tests/compiler/testdata/retry.py | 42 ++++++++ sdk/python/tests/compiler/testdata/retry.yaml | 95 +++++++++++++++++++ 6 files changed, 197 insertions(+), 1 deletion(-) create mode 100755 samples/basic/retry.py create mode 100755 sdk/python/tests/compiler/testdata/retry.py create mode 100644 sdk/python/tests/compiler/testdata/retry.yaml diff --git a/samples/basic/retry.py b/samples/basic/retry.py new file mode 100755 index 0000000000..4d5e244291 --- /dev/null +++ b/samples/basic/retry.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# Copyright 2019 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl + + +class RandomFailure1Op(dsl.ContainerOp): + """A component that fails randomly.""" + + def __init__(self, exit_codes): + super(RandomFailure1Op, self).__init__( + name='random_failure', + image='python:alpine3.6', + command=['python', '-c'], + arguments=["import random; import sys; exit_code = random.choice([%s]); print(exit_code); sys.exit(exit_code)" % exit_codes]) + + +@dsl.pipeline( + name='pipeline includes two steps which fail randomly.', + description='shows how to use ContainerOp set_retry().' +) +def retry_sample_pipeline(): + op1 = RandomFailure1Op('0,1,2,3').set_retry(10) + op2 = RandomFailure1Op('0,1').set_retry(5) + + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz') diff --git a/sdk/python/kfp/compiler/compiler.py b/sdk/python/kfp/compiler/compiler.py index 7fb3d71614..378d916c12 100644 --- a/sdk/python/kfp/compiler/compiler.py +++ b/sdk/python/kfp/compiler/compiler.py @@ -329,6 +329,9 @@ class Compiler(object): if op.pod_labels: template['metadata']['labels'] = op.pod_labels + if op.num_retries: + template['retryStrategy'] = {'limit': op.num_retries} + return template def _group_to_template(self, group, inputs, outputs, dependencies): diff --git a/sdk/python/kfp/dsl/_container_op.py b/sdk/python/kfp/dsl/_container_op.py index f7c56fd5b8..90e2785480 100644 --- a/sdk/python/kfp/dsl/_container_op.py +++ b/sdk/python/kfp/dsl/_container_op.py @@ -62,6 +62,7 @@ class ContainerOp(object): self.env_variables = [] self.pod_annotations = {} self.pod_labels = {} + self.num_retries = 0 matches = [] for arg in (command or []) + (arguments or []): @@ -216,7 +217,6 @@ class ContainerOp(object): return self.add_resource_limit("%s.com/gpu" % vendor, gpu) - def add_volume(self, volume): """Add K8s volume to the container @@ -288,5 +288,15 @@ class ContainerOp(object): self.pod_labels[name] = value return self + def set_retry(self, num_retries: int): + """Sets the number of times the task is retried until it's declared failed. + + Args: + num_retries: Number of times to retry on failures. + """ + + self.num_retries = num_retries + return self + def __repr__(self): return str({self.__class__.__name__: self.__dict__}) diff --git a/sdk/python/tests/compiler/compiler_tests.py b/sdk/python/tests/compiler/compiler_tests.py index b0faafd575..6ea350e168 100644 --- a/sdk/python/tests/compiler/compiler_tests.py +++ b/sdk/python/tests/compiler/compiler_tests.py @@ -228,3 +228,7 @@ class TestCompiler(unittest.TestCase): def test_py_volume(self): """Test a pipeline with a volume and volume mount.""" self._test_py_compile('volume') + + def test_py_retry(self): + """Test retry functionality.""" + self._test_py_compile('retry') diff --git a/sdk/python/tests/compiler/testdata/retry.py b/sdk/python/tests/compiler/testdata/retry.py new file mode 100755 index 0000000000..030fc1ddc4 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/retry.py @@ -0,0 +1,42 @@ +#!/usr/bin/env python3 +# Copyright 2018 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import kfp.dsl as dsl + + +class RandomFailure1Op(dsl.ContainerOp): + """A component that fails randomly.""" + + def __init__(self, exit_codes): + super(RandomFailure1Op, self).__init__( + name='random_failure', + image='python:alpine3.6', + command=['python', '-c'], + arguments=["import random; import sys; exit_code = random.choice([%s]); print(exit_code); sys.exit(exit_code)" % exit_codes]) + + +@dsl.pipeline( + name='pipeline includes two steps which fail randomly.', + description='shows how to use ContainerOp set_retry().' +) +def retry_sample_pipeline(): + op1 = RandomFailure1Op('0,1,2,3').set_retry(100) + op2 = RandomFailure1Op('0,1').set_retry(50) + + +if __name__ == '__main__': + import kfp.compiler as compiler + compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz') diff --git a/sdk/python/tests/compiler/testdata/retry.yaml b/sdk/python/tests/compiler/testdata/retry.yaml new file mode 100644 index 0000000000..d5a619efc4 --- /dev/null +++ b/sdk/python/tests/compiler/testdata/retry.yaml @@ -0,0 +1,95 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + generateName: pipeline-includes-two-steps-which-fail-randomly- +spec: + arguments: + parameters: [] + entrypoint: pipeline-includes-two-steps-which-fail-randomly + serviceAccountName: pipeline-runner + templates: + - dag: + tasks: + - name: random-failure + template: random-failure + - name: random-failure-2 + template: random-failure-2 + name: pipeline-includes-two-steps-which-fail-randomly + - container: + args: + - import random; import sys; exit_code = random.choice([0,1,2,3]); print(exit_code); + sys.exit(exit_code) + command: + - python + - -c + image: python:alpine3.6 + name: random-failure + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + retryStrategy: + limit: 100 + - container: + args: + - import random; import sys; exit_code = random.choice([0,1]); print(exit_code); + sys.exit(exit_code) + command: + - python + - -c + image: python:alpine3.6 + name: random-failure-2 + outputs: + artifacts: + - name: mlpipeline-ui-metadata + path: /mlpipeline-ui-metadata.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + - name: mlpipeline-metrics + path: /mlpipeline-metrics.json + s3: + accessKeySecret: + key: accesskey + name: mlpipeline-minio-artifact + bucket: mlpipeline + endpoint: minio-service.kubeflow:9000 + insecure: true + key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz + secretKeySecret: + key: secretkey + name: mlpipeline-minio-artifact + retryStrategy: + limit: 50