fix #2802: Set ImagePullPolicy per pipeline. (#3534)

* bump version

* default image pull policy

* Update sdk/python/kfp/dsl/_pipeline.py

* task setting should dominate

* Update sdk/python/kfp/dsl/_pipeline.py

* fixed merge misstake
This commit is contained in:
Niklas Hansson 2020-04-23 16:09:13 +02:00 committed by GitHub
parent b63ad7e614
commit 2354776e1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 92 additions and 2 deletions

View File

@ -686,6 +686,17 @@ class Compiler(object):
if exit_handler:
workflow['spec']['onExit'] = exit_handler.name
if pipeline_conf.image_pull_policy != None:
if pipeline_conf.image_pull_policy in ["Always", "Never", "IfNotPresent"]:
for template in workflow["spec"]["templates"]:
container = template.get('container', None)
if container and "imagePullPolicy" not in container:
container["imagePullPolicy"] = pipeline_conf.image_pull_policy
else:
raise ValueError(
'Invalid imagePullPolicy. Must be one of `Always`, `Never`, `IfNotPresent`.'
)
return workflow
def _validate_exit_handler(self, pipeline):

View File

@ -61,6 +61,7 @@ class PipelineConf():
self.timeout = 0
self.ttl_seconds_after_finished = -1
self.op_transformers = []
self.image_pull_policy = None
def set_image_pull_secrets(self, image_pull_secrets):
"""Configures the pipeline level imagepullsecret
@ -91,6 +92,16 @@ class PipelineConf():
self.ttl_seconds_after_finished = seconds
return self
def set_image_pull_policy(self, policy: str):
"""Configures the default image pull policy
Args:
policy: the pull policy, has to be one of: Always, Never, IfNotPresent.
For more info: https://github.com/kubernetes-client/python/blob/10a7f95435c0b94a6d949ba98375f8cc85a70e5a/kubernetes/docs/V1Container.md
"""
self.image_pull_policy = policy
return self
def add_op_transformer(self, transformer):
"""Configures the op_transformers which will be applied to all ops in the pipeline.
@ -218,5 +229,3 @@ class Pipeline():
metadata (ComponentMeta): component metadata
'''
self._metadata = metadata

View File

@ -678,6 +678,76 @@ implementation:
container = template.get('container', None)
if container:
self.assertEqual(template['retryStrategy']['limit'], 5)
def test_image_pull_policy(self):
def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)
@dsl.pipeline(name='some_pipeline')
def some_pipeline():
task1 = some_op()
task2 = some_op()
task3 = some_op()
dsl.get_pipeline_conf().set_image_pull_policy(policy="Always")
workflow_dict = compiler.Compiler()._compile(some_pipeline)
for template in workflow_dict['spec']['templates']:
container = template.get('container', None)
if container:
self.assertEqual(template['container']['imagePullPolicy'], "Always")
def test_image_pull_policy_step_spec(self):
def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)
def some_other_op():
return dsl.ContainerOp(
name='other',
image='busybox',
command=['sleep 1'],
)
@dsl.pipeline(name='some_pipeline')
def some_pipeline():
task1 = some_op()
task2 = some_op()
task3 = some_other_op().set_image_pull_policy("IfNotPresent")
dsl.get_pipeline_conf().set_image_pull_policy(policy="Always")
workflow_dict = compiler.Compiler()._compile(some_pipeline)
for template in workflow_dict['spec']['templates']:
container = template.get('container', None)
if container:
if template['name' ] == "other":
self.assertEqual(template['container']['imagePullPolicy'], "IfNotPresent")
elif template['name' ] == "sleep":
self.assertEqual(template['container']['imagePullPolicy'], "Always")
def test_image_pull_policy_invalid_setting(self):
def some_op():
return dsl.ContainerOp(
name='sleep',
image='busybox',
command=['sleep 1'],
)
with self.assertRaises(ValueError):
@dsl.pipeline(name='some_pipeline')
def some_pipeline():
task1 = some_op()
task2 = some_op()
dsl.get_pipeline_conf().set_image_pull_policy(policy="Alwayss")
workflow_dict = compiler.Compiler()._compile(some_pipeline)
def test_container_op_output_error_when_no_or_multiple_outputs(self):