* Update _client.py * Update _client.py * added pod disruption budget * clean up * Update sdk/python/kfp/dsl/_pipeline.py * fixed parameter * updated after feedback * removed selector
This commit is contained in:
parent
d91a0c9da1
commit
c32ea232d5
|
|
@ -683,6 +683,10 @@ class Compiler(object):
|
|||
if pipeline_conf.ttl_seconds_after_finished >= 0:
|
||||
workflow['spec']['ttlSecondsAfterFinished'] = pipeline_conf.ttl_seconds_after_finished
|
||||
|
||||
if pipeline_conf._pod_disruption_budget_min_available:
|
||||
pod_disruption_budget = {"minAvailable": pipeline_conf._pod_disruption_budget_min_available}
|
||||
workflow['spec']['podDisruptionBudget'] = pod_disruption_budget
|
||||
|
||||
if len(pipeline_conf.image_pull_secrets) > 0:
|
||||
image_pull_secrets = []
|
||||
for image_pull_secret in pipeline_conf.image_pull_secrets:
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@
|
|||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
from typing import Union
|
||||
from . import _container_op
|
||||
from . import _resource_op
|
||||
from . import _ops_group
|
||||
|
|
@ -60,6 +60,7 @@ class PipelineConf():
|
|||
self.image_pull_secrets = []
|
||||
self.timeout = 0
|
||||
self.ttl_seconds_after_finished = -1
|
||||
self._pod_disruption_budget_min_available = None
|
||||
self.op_transformers = []
|
||||
self.default_pod_node_selector = {}
|
||||
self.image_pull_policy = None
|
||||
|
|
@ -104,6 +105,18 @@ class PipelineConf():
|
|||
self.ttl_seconds_after_finished = seconds
|
||||
return self
|
||||
|
||||
def set_pod_disruption_budget(self, min_available: Union[int, str]):
|
||||
""" PodDisruptionBudget holds the number of concurrent disruptions that you allow for pipeline Pods.
|
||||
|
||||
Args:
|
||||
min_available (Union[int, str]): An eviction is allowed if at least "minAvailable" pods selected by
|
||||
"selector" will still be available after the eviction, i.e. even in the
|
||||
absence of the evicted pod. So for example you can prevent all voluntary
|
||||
evictions by specifying "100%". "minAvailable" can be either an absolute number or a percentage.
|
||||
"""
|
||||
self._pod_disruption_budget_min_available = min_available
|
||||
return self
|
||||
|
||||
def set_default_pod_node_selector(self, label_name: str, value: str):
|
||||
"""Add a constraint for nodeSelector for a pipeline. Each constraint is a key-value pair label. For the
|
||||
container to be eligible to run on a node, the node must have each of the constraints appeared
|
||||
|
|
|
|||
|
|
@ -674,6 +674,23 @@ implementation:
|
|||
workflow_dict = kfp.compiler.Compiler()._compile(some_pipeline)
|
||||
self.assertEqual(workflow_dict['spec']['ttlSecondsAfterFinished'], 86400)
|
||||
|
||||
def test_pod_disruption_budget(self):
|
||||
"""Test a pipeline with poddisruption budget."""
|
||||
def some_op():
|
||||
return dsl.ContainerOp(
|
||||
name='sleep',
|
||||
image='busybox',
|
||||
command=['sleep 1'],
|
||||
)
|
||||
|
||||
@dsl.pipeline()
|
||||
def some_pipeline():
|
||||
some_op()
|
||||
dsl.get_pipeline_conf().set_pod_disruption_budget("100%")
|
||||
|
||||
workflow_dict = kfp.compiler.Compiler()._compile(some_pipeline)
|
||||
self.assertEqual(workflow_dict['spec']["podDisruptionBudget"]['minAvailable'], "100%")
|
||||
|
||||
def test_op_transformers(self):
|
||||
def some_op():
|
||||
return dsl.ContainerOp(
|
||||
|
|
|
|||
Loading…
Reference in New Issue