[Feature] Set ttlSecondsAfterFinished in argo workflow with PipelineConf (#1594)

* Add PipelineConf method to set ttlSecondsAfterFinished in argo workflow spec

* remove unnecessary compile test for ttl. add unit test for ttl instead.
This commit is contained in:
Eterna2 2019-07-25 00:26:16 +08:00 committed by Kubernetes Prow Robot
parent fe639f4166
commit 08ff76f5f1
3 changed files with 30 additions and 0 deletions

View File

@ -545,6 +545,10 @@ class Compiler(object):
'serviceAccountName': 'pipeline-runner'
}
}
# set ttl after workflow finishes
if pipeline.conf.ttl_seconds_after_finished >= 0:
workflow['spec']['ttlSecondsAfterFinished'] = pipeline.conf.ttl_seconds_after_finished
if len(pipeline.conf.image_pull_secrets) > 0:
image_pull_secrets = []
for image_pull_secret in pipeline.conf.image_pull_secrets:

View File

@ -57,6 +57,7 @@ class PipelineConf():
def __init__(self):
self.image_pull_secrets = []
self.timeout = 0
self.ttl_seconds_after_finished = -1
self.artifact_location = None
self.op_transformers = []
@ -80,6 +81,15 @@ class PipelineConf():
self.timeout = seconds
return self
def set_ttl_seconds_after_finished(self, seconds: int):
"""Configures the ttl after the pipeline has finished.
Args:
seconds: number of seconds for the workflow to be garbage collected after it is finished.
"""
self.ttl_seconds_after_finished = seconds
return self
def set_artifact_location(self, artifact_location):
"""Configures the pipeline level artifact location.

View File

@ -523,6 +523,22 @@ implementation:
template = workflow_dict['spec']['templates'][0]
self.assertEqual(template['metadata']['annotations']['pipelines.kubeflow.org/task_display_name'], 'Custom name')
def test_set_ttl_seconds_after_finished(self):
"""Test a pipeline with ttl after finished."""
def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)
@dsl.pipeline()
def some_pipeline():
some_op()
dsl.get_pipeline_conf().set_ttl_seconds_after_finished(86400)
workflow_dict = kfp.compiler.Compiler()._compile(some_pipeline)
self.assertEqual(workflow_dict['spec']['ttlSecondsAfterFinished'], 86400)
def test_op_transformers(self):
def some_op():