Add timeout out in dsl (#1465)

* add timeout in dsl
* add pipeline level timeout
This commit is contained in:
Ning 2019-06-06 17:42:10 -07:00 committed by GitHub
parent 7d69cda69c
commit 5061fcffcf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 130 additions and 0 deletions

View File

@ -245,6 +245,10 @@ def _op_to_template(op: BaseOp):
if processed_op.num_retries:
template['retryStrategy'] = {'limit': processed_op.num_retries}
# timeout
if processed_op.timeout:
template['activeDeadlineSeconds'] = processed_op.timeout
# sidecars
if processed_op.sidecars:
template['sidecars'] = processed_op.sidecars

View File

@ -551,6 +551,10 @@ class Compiler(object):
for image_pull_secret in pipeline.conf.image_pull_secrets:
image_pull_secrets.append(K8sHelper.convert_k8s_obj_to_json(image_pull_secret))
workflow['spec']['imagePullSecrets'] = image_pull_secrets
if pipeline.conf.timeout:
workflow['spec']['activeDeadlineSeconds'] = pipeline.conf.timeout
if exit_handler:
workflow['spec']['onExit'] = exit_handler.name
if volumes:

View File

@ -688,6 +688,7 @@ class BaseOp(object):
self.pod_annotations = {}
self.pod_labels = {}
self.num_retries = 0
self.timeout = 0
self.sidecars = sidecars or []
# attributes specific to `BaseOp`
@ -807,6 +808,16 @@ class BaseOp(object):
self.num_retries = num_retries
return self
def set_timeout(self, seconds: int):
"""Sets the timeout for the task in seconds.
Args:
seconds: Number of seconds.
"""
self.timeout = seconds
return self
def add_sidecar(self, sidecar: Sidecar):
"""Add a sidecar to the Op.

View File

@ -56,6 +56,7 @@ class PipelineConf():
"""
def __init__(self):
self.image_pull_secrets = []
self.timeout = 0
self.artifact_location = None
self.op_transformers = []
@ -70,6 +71,15 @@ class PipelineConf():
self.image_pull_secrets = image_pull_secrets
return self
def set_timeout(self, seconds: int):
"""Configures the pipeline level timeout
Args:
seconds: number of seconds for timeout
"""
self.timeout = seconds
return self
def set_artifact_location(self, artifact_location):
"""Configures the pipeline level artifact location.
@ -112,6 +122,7 @@ class PipelineConf():
"""
self.op_transformers.append(transformer)
def get_pipeline_conf():
"""Configure the pipeline level setting to the current pipeline
Note: call the function inside the user defined pipeline function.

View File

@ -322,6 +322,10 @@ class TestCompiler(unittest.TestCase):
"""Test pipeline imagepullsecret."""
self._test_py_compile_yaml('imagepullsecret')
def test_py_timeout(self):
"""Test pipeline timeout."""
self._test_py_compile_yaml('timeout')
def test_py_recursive_do_while(self):
"""Test pipeline recursive."""
self._test_py_compile_yaml('recursive_do_while')

43
sdk/python/tests/compiler/testdata/timeout.py vendored Executable file
View File

@ -0,0 +1,43 @@
#!/usr/bin/env python3
# Copyright 2018 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import kfp.dsl as dsl
class RandomFailure1Op(dsl.ContainerOp):
"""A component that fails randomly."""
def __init__(self, exit_codes):
super(RandomFailure1Op, self).__init__(
name='random_failure',
image='python:alpine3.6',
command=['python', '-c'],
arguments=["import random; import sys; exit_code = random.choice([%s]); print(exit_code); sys.exit(exit_code)" % exit_codes])
@dsl.pipeline(
name='pipeline includes two steps which fail randomly.',
description='shows how to use ContainerOp set_retry().'
)
def retry_sample_pipeline():
op1 = RandomFailure1Op('0,1,2,3').set_timeout(10)
op2 = RandomFailure1Op('0,1')
dsl.get_pipeline_conf().set_timeout(50)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(retry_sample_pipeline, __file__ + '.tar.gz')

View File

@ -0,0 +1,53 @@
apiVersion: argoproj.io/v1alpha1
kind: Workflow
metadata:
generateName: pipeline-includes-two-steps-which-fail-randomly-
spec:
activeDeadlineSeconds: 50
arguments:
parameters: []
entrypoint: pipeline-includes-two-steps-which-fail-randomly
serviceAccountName: pipeline-runner
templates:
- dag:
tasks:
- name: random-failure
template: random-failure
- name: random-failure-2
template: random-failure-2
name: pipeline-includes-two-steps-which-fail-randomly
- activeDeadlineSeconds: 10
container:
args:
- import random; import sys; exit_code = random.choice([0,1,2,3]); print(exit_code);
sys.exit(exit_code)
command:
- python
- -c
image: python:alpine3.6
name: random-failure
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json
- container:
args:
- import random; import sys; exit_code = random.choice([0,1]); print(exit_code);
sys.exit(exit_code)
command:
- python
- -c
image: python:alpine3.6
name: random-failure-2
outputs:
artifacts:
- name: mlpipeline-ui-metadata
optional: true
path: /mlpipeline-ui-metadata.json
- name: mlpipeline-metrics
optional: true
path: /mlpipeline-metrics.json