Add "set_retry()" on ContainerOp. (#723)
* Add "set_retry()" on ContainerOp. * Follow up on CR comments. * Update docstring. * Increase retry times for test. * Fix test.
This commit is contained in:
parent
9b4088626c
commit
3b3a15e16a
|
|
@ -0,0 +1,42 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright 2019 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import kfp.dsl as dsl
|
||||
|
||||
|
||||
class RandomFailure1Op(dsl.ContainerOp):
|
||||
"""A component that fails randomly."""
|
||||
|
||||
def __init__(self, exit_codes):
|
||||
super(RandomFailure1Op, self).__init__(
|
||||
name='random_failure',
|
||||
image='python:alpine3.6',
|
||||
command=['python', '-c'],
|
||||
arguments=["import random; import sys; exit_code = random.choice([%s]); print(exit_code); sys.exit(exit_code)" % exit_codes])
|
||||
|
||||
|
||||
@dsl.pipeline(
|
||||
name='pipeline includes two steps which fail randomly.',
|
||||
description='shows how to use ContainerOp set_retry().'
|
||||
)
|
||||
def retry_sample_pipeline():
|
||||
op1 = RandomFailure1Op('0,1,2,3').set_retry(10)
|
||||
op2 = RandomFailure1Op('0,1').set_retry(5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import kfp.compiler as compiler
|
||||
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz')
|
||||
|
|
@ -329,6 +329,9 @@ class Compiler(object):
|
|||
if op.pod_labels:
|
||||
template['metadata']['labels'] = op.pod_labels
|
||||
|
||||
if op.num_retries:
|
||||
template['retryStrategy'] = {'limit': op.num_retries}
|
||||
|
||||
return template
|
||||
|
||||
def _group_to_template(self, group, inputs, outputs, dependencies):
|
||||
|
|
|
|||
|
|
@ -62,6 +62,7 @@ class ContainerOp(object):
|
|||
self.env_variables = []
|
||||
self.pod_annotations = {}
|
||||
self.pod_labels = {}
|
||||
self.num_retries = 0
|
||||
|
||||
matches = []
|
||||
for arg in (command or []) + (arguments or []):
|
||||
|
|
@ -216,7 +217,6 @@ class ContainerOp(object):
|
|||
|
||||
return self.add_resource_limit("%s.com/gpu" % vendor, gpu)
|
||||
|
||||
|
||||
def add_volume(self, volume):
|
||||
"""Add K8s volume to the container
|
||||
|
||||
|
|
@ -288,5 +288,15 @@ class ContainerOp(object):
|
|||
self.pod_labels[name] = value
|
||||
return self
|
||||
|
||||
def set_retry(self, num_retries: int):
|
||||
"""Sets the number of times the task is retried until it's declared failed.
|
||||
|
||||
Args:
|
||||
num_retries: Number of times to retry on failures.
|
||||
"""
|
||||
|
||||
self.num_retries = num_retries
|
||||
return self
|
||||
|
||||
def __repr__(self):
|
||||
return str({self.__class__.__name__: self.__dict__})
|
||||
|
|
|
|||
|
|
@ -228,3 +228,7 @@ class TestCompiler(unittest.TestCase):
|
|||
def test_py_volume(self):
|
||||
"""Test a pipeline with a volume and volume mount."""
|
||||
self._test_py_compile('volume')
|
||||
|
||||
def test_py_retry(self):
|
||||
"""Test retry functionality."""
|
||||
self._test_py_compile('retry')
|
||||
|
|
|
|||
|
|
@ -0,0 +1,42 @@
|
|||
#!/usr/bin/env python3
|
||||
# Copyright 2018 Google LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import kfp.dsl as dsl
|
||||
|
||||
|
||||
class RandomFailure1Op(dsl.ContainerOp):
|
||||
"""A component that fails randomly."""
|
||||
|
||||
def __init__(self, exit_codes):
|
||||
super(RandomFailure1Op, self).__init__(
|
||||
name='random_failure',
|
||||
image='python:alpine3.6',
|
||||
command=['python', '-c'],
|
||||
arguments=["import random; import sys; exit_code = random.choice([%s]); print(exit_code); sys.exit(exit_code)" % exit_codes])
|
||||
|
||||
|
||||
@dsl.pipeline(
|
||||
name='pipeline includes two steps which fail randomly.',
|
||||
description='shows how to use ContainerOp set_retry().'
|
||||
)
|
||||
def retry_sample_pipeline():
|
||||
op1 = RandomFailure1Op('0,1,2,3').set_retry(100)
|
||||
op2 = RandomFailure1Op('0,1').set_retry(50)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
import kfp.compiler as compiler
|
||||
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz')
|
||||
|
|
@ -0,0 +1,95 @@
|
|||
apiVersion: argoproj.io/v1alpha1
|
||||
kind: Workflow
|
||||
metadata:
|
||||
generateName: pipeline-includes-two-steps-which-fail-randomly-
|
||||
spec:
|
||||
arguments:
|
||||
parameters: []
|
||||
entrypoint: pipeline-includes-two-steps-which-fail-randomly
|
||||
serviceAccountName: pipeline-runner
|
||||
templates:
|
||||
- dag:
|
||||
tasks:
|
||||
- name: random-failure
|
||||
template: random-failure
|
||||
- name: random-failure-2
|
||||
template: random-failure-2
|
||||
name: pipeline-includes-two-steps-which-fail-randomly
|
||||
- container:
|
||||
args:
|
||||
- import random; import sys; exit_code = random.choice([0,1,2,3]); print(exit_code);
|
||||
sys.exit(exit_code)
|
||||
command:
|
||||
- python
|
||||
- -c
|
||||
image: python:alpine3.6
|
||||
name: random-failure
|
||||
outputs:
|
||||
artifacts:
|
||||
- name: mlpipeline-ui-metadata
|
||||
path: /mlpipeline-ui-metadata.json
|
||||
s3:
|
||||
accessKeySecret:
|
||||
key: accesskey
|
||||
name: mlpipeline-minio-artifact
|
||||
bucket: mlpipeline
|
||||
endpoint: minio-service.kubeflow:9000
|
||||
insecure: true
|
||||
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz
|
||||
secretKeySecret:
|
||||
key: secretkey
|
||||
name: mlpipeline-minio-artifact
|
||||
- name: mlpipeline-metrics
|
||||
path: /mlpipeline-metrics.json
|
||||
s3:
|
||||
accessKeySecret:
|
||||
key: accesskey
|
||||
name: mlpipeline-minio-artifact
|
||||
bucket: mlpipeline
|
||||
endpoint: minio-service.kubeflow:9000
|
||||
insecure: true
|
||||
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz
|
||||
secretKeySecret:
|
||||
key: secretkey
|
||||
name: mlpipeline-minio-artifact
|
||||
retryStrategy:
|
||||
limit: 100
|
||||
- container:
|
||||
args:
|
||||
- import random; import sys; exit_code = random.choice([0,1]); print(exit_code);
|
||||
sys.exit(exit_code)
|
||||
command:
|
||||
- python
|
||||
- -c
|
||||
image: python:alpine3.6
|
||||
name: random-failure-2
|
||||
outputs:
|
||||
artifacts:
|
||||
- name: mlpipeline-ui-metadata
|
||||
path: /mlpipeline-ui-metadata.json
|
||||
s3:
|
||||
accessKeySecret:
|
||||
key: accesskey
|
||||
name: mlpipeline-minio-artifact
|
||||
bucket: mlpipeline
|
||||
endpoint: minio-service.kubeflow:9000
|
||||
insecure: true
|
||||
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-ui-metadata.tgz
|
||||
secretKeySecret:
|
||||
key: secretkey
|
||||
name: mlpipeline-minio-artifact
|
||||
- name: mlpipeline-metrics
|
||||
path: /mlpipeline-metrics.json
|
||||
s3:
|
||||
accessKeySecret:
|
||||
key: accesskey
|
||||
name: mlpipeline-minio-artifact
|
||||
bucket: mlpipeline
|
||||
endpoint: minio-service.kubeflow:9000
|
||||
insecure: true
|
||||
key: runs/{{workflow.uid}}/{{pod.name}}/mlpipeline-metrics.tgz
|
||||
secretKeySecret:
|
||||
key: secretkey
|
||||
name: mlpipeline-minio-artifact
|
||||
retryStrategy:
|
||||
limit: 50
|
||||
Loading…
Reference in New Issue