* added resource request at runtime * fixed things * Update to use read only parameter insteadt * added test case and better example * Updated again * add the validation * add to the test suit * work in progress * update after feedback * fix the test * clean up * clean up * fix the path * add the test again * clean up * fix tests * feedback fix * comment out and clean up
This commit is contained in:
parent
e860fd6ee1
commit
5db843102a
|
|
@ -0,0 +1,48 @@
|
|||
# Copyright 2021 The Kubeflow Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import kfp
|
||||
from kfp import dsl, components
|
||||
from typing import NamedTuple
|
||||
|
||||
@components.create_component_from_func
|
||||
def training_op(n: int) -> int:
|
||||
# quickly allocate a lot of memory to verify memory is enough
|
||||
a = [i for i in range(n)]
|
||||
return len(a)
|
||||
|
||||
@components.create_component_from_func
|
||||
def generate_resouce_request() -> NamedTuple('output', [('memory', str), ('cpu', str)]):
|
||||
'''Returns the memory and cpu request'''
|
||||
from collections import namedtuple
|
||||
|
||||
resouce_output = namedtuple('output', ['memory', 'cpu'])
|
||||
return resouce_output('500Mi', '200m')
|
||||
|
||||
@dsl.pipeline(
|
||||
name='Runtime resource request pipeline',
|
||||
description='An example on how to make resource requests at runtime.'
|
||||
)
|
||||
def resource_request_pipeline(n: int = 11234567):
|
||||
resouce_task = generate_resouce_request()
|
||||
traning_task = training_op(n)\
|
||||
.set_memory_limit(resouce_task.outputs['memory'])\
|
||||
.set_cpu_limit(resouce_task.outputs['cpu'])\
|
||||
.set_cpu_request('200m')
|
||||
|
||||
# Disable cache for KFP v1 mode.
|
||||
traning_task.execution_options.caching_strategy.max_cache_staleness = 'P0D'
|
||||
|
||||
if __name__ == '__main__':
|
||||
kfp.compiler.Compiler().compile(resource_request_pipeline, __file__ + '.yaml')
|
||||
|
|
@ -0,0 +1,48 @@
|
|||
# Copyright 2021 The Kubeflow Authors
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import kfp
|
||||
from .runtime_resource_request import resource_request_pipeline
|
||||
from ...test.util import run_pipeline_func, TestCase
|
||||
|
||||
|
||||
def EXPECTED_OOM(run_id, run, **kwargs):
|
||||
'''confirms a sample test case is failing, because of OOM '''
|
||||
assert run.status == 'Failed'
|
||||
|
||||
|
||||
run_pipeline_func([
|
||||
TestCase(
|
||||
pipeline_func=resource_request_pipeline,
|
||||
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
|
||||
),
|
||||
# TODO: blocked by https://github.com/kubeflow/pipelines/issues/5835
|
||||
# TestCase(
|
||||
# pipeline_func=resource_request_pipeline,
|
||||
# mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
|
||||
# ),
|
||||
TestCase(
|
||||
pipeline_func=resource_request_pipeline,
|
||||
mode=kfp.dsl.PipelineExecutionMode.V1_LEGACY,
|
||||
arguments={'n': 21234567},
|
||||
verify_func=EXPECTED_OOM,
|
||||
),
|
||||
# TODO: blocked by https://github.com/kubeflow/pipelines/issues/5835
|
||||
# TestCase(
|
||||
# pipeline_func=resource_request_pipeline,
|
||||
# mode=kfp.dsl.PipelineExecutionMode.V2_COMPATIBLE,
|
||||
# arguments={'n': 21234567},
|
||||
# verify_func=EXPECTED_OOM,
|
||||
# ),
|
||||
])
|
||||
|
|
@ -40,6 +40,8 @@
|
|||
path: samples.core.loop_parallelism.loop_parallelism_test
|
||||
- name: resource_spec
|
||||
path: samples.core.resource_spec.resource_spec_test
|
||||
- name: runtime_resource_spec
|
||||
path: samples.core.resource_spec.runtime_resource_request_test
|
||||
- name: xgboost_sample
|
||||
path: samples.core.XGBoost.xgboost_sample_test
|
||||
- name: use_run_id
|
||||
|
|
|
|||
|
|
@ -288,6 +288,24 @@ def _op_to_template(op: BaseOp):
|
|||
template['volumes'] = [convert_k8s_obj_to_json(volume) for volume in processed_op.volumes]
|
||||
template['volumes'].sort(key=lambda x: x['name'])
|
||||
|
||||
# Runtime resource requests
|
||||
if isinstance(op, dsl.ContainerOp) and ('resources' in op.container.keys()):
|
||||
podSpecPatch = {}
|
||||
for setting, val in op.container['resources'].items():
|
||||
for resource, param in val.items():
|
||||
if (resource in ['cpu', 'memory']) and re.match('^{{inputs.parameters.*}}$', param):
|
||||
if not 'containers' in podSpecPatch:
|
||||
podSpecPatch = {'containers':[{'name':'main', 'resources':{}}]}
|
||||
if setting not in podSpecPatch['containers'][0]['resources']:
|
||||
podSpecPatch['containers'][0]['resources'][setting] = {resource: param}
|
||||
else:
|
||||
podSpecPatch['containers'][0]['resources'][setting][resource] = param
|
||||
del template['container']['resources'][setting][resource]
|
||||
if not template['container']['resources'][setting]:
|
||||
del template['container']['resources'][setting]
|
||||
if podSpecPatch:
|
||||
template['podSpecPatch'] = json.dumps(podSpecPatch)
|
||||
|
||||
if isinstance(op, dsl.ContainerOp) and op._metadata and not op.is_v2:
|
||||
template.setdefault('metadata', {}).setdefault('annotations', {})['pipelines.kubeflow.org/component_spec'] = json.dumps(op._metadata.to_dict(), sort_keys=True)
|
||||
|
||||
|
|
|
|||
|
|
@ -289,27 +289,29 @@ class Container(V1Container):
|
|||
self.resources.requests.update({resource_name: value})
|
||||
return self
|
||||
|
||||
def set_memory_request(self, memory) -> 'Container':
|
||||
def set_memory_request(self, memory: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
|
||||
"""Set memory request (minimum) for this operator.
|
||||
|
||||
Args:
|
||||
memory: a string which can be a number or a number followed by one of
|
||||
memory(Union[str, PipelineParam]): a string which can be a number or a number followed by one of
|
||||
"E", "P", "T", "G", "M", "K".
|
||||
"""
|
||||
|
||||
self._validate_size_string(memory)
|
||||
if not isinstance(memory,_pipeline_param.PipelineParam):
|
||||
self._validate_size_string(memory)
|
||||
return self.add_resource_request('memory', memory)
|
||||
|
||||
def set_memory_limit(self, memory) -> 'Container':
|
||||
def set_memory_limit(self, memory: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
|
||||
"""Set memory limit (maximum) for this operator.
|
||||
|
||||
Args:
|
||||
memory: a string which can be a number or a number followed by one of
|
||||
memory(Union[str, PipelineParam]): a string which can be a number or a number followed by one of
|
||||
"E", "P", "T", "G", "M", "K".
|
||||
"""
|
||||
self._validate_size_string(memory)
|
||||
if self._container_spec:
|
||||
self._container_spec.resources.memory_limit = _get_resource_number(memory)
|
||||
if not isinstance(memory,_pipeline_param.PipelineParam):
|
||||
self._validate_size_string(memory)
|
||||
if self._container_spec:
|
||||
self._container_spec.resources.memory_limit = _get_resource_number(memory)
|
||||
return self.add_resource_limit('memory', memory)
|
||||
|
||||
def set_ephemeral_storage_request(self, size) -> 'Container':
|
||||
|
|
@ -332,27 +334,29 @@ class Container(V1Container):
|
|||
self._validate_size_string(size)
|
||||
return self.add_resource_limit('ephemeral-storage', size)
|
||||
|
||||
def set_cpu_request(self, cpu) -> 'Container':
|
||||
def set_cpu_request(self, cpu: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
|
||||
"""Set cpu request (minimum) for this operator.
|
||||
|
||||
Args:
|
||||
cpu: A string which can be a number or a number followed by "m", which
|
||||
cpu(Union[str, PipelineParam]): A string which can be a number or a number followed by "m", which
|
||||
means 1/1000.
|
||||
"""
|
||||
|
||||
self._validate_cpu_string(cpu)
|
||||
if not isinstance(cpu,_pipeline_param.PipelineParam):
|
||||
self._validate_cpu_string(cpu)
|
||||
return self.add_resource_request('cpu', cpu)
|
||||
|
||||
def set_cpu_limit(self, cpu) -> 'Container':
|
||||
def set_cpu_limit(self, cpu: Union[str, _pipeline_param.PipelineParam]) -> 'Container':
|
||||
"""Set cpu limit (maximum) for this operator.
|
||||
|
||||
Args:
|
||||
cpu: A string which can be a number or a number followed by "m", which
|
||||
cpu(Union[str, PipelineParam]): A string which can be a number or a number followed by "m", which
|
||||
means 1/1000.
|
||||
"""
|
||||
self._validate_cpu_string(cpu)
|
||||
if self._container_spec:
|
||||
self._container_spec.resources.cpu_limit = _get_cpu_number(cpu)
|
||||
|
||||
if not isinstance(cpu,_pipeline_param.PipelineParam):
|
||||
self._validate_cpu_string(cpu)
|
||||
if self._container_spec:
|
||||
self._container_spec.resources.cpu_limit = _get_cpu_number(cpu)
|
||||
return self.add_resource_limit('cpu', cpu)
|
||||
|
||||
def set_gpu_limit(self, gpu, vendor='nvidia') -> 'Container':
|
||||
|
|
|
|||
|
|
@ -406,6 +406,20 @@ class TestCompiler(unittest.TestCase):
|
|||
self.assertEqual(template['retryStrategy']['backoff']['maxDuration'], backoff_max_duration)
|
||||
|
||||
|
||||
def test_py_runtime_memory_request(self):
|
||||
"""Test memory request."""
|
||||
|
||||
def my_pipeline(memory: str, cpu: str):
|
||||
some_op().set_cpu_request(memory)
|
||||
|
||||
workflow = kfp.compiler.Compiler()._create_workflow(my_pipeline)
|
||||
name_to_template = {template['name']: template for template in workflow['spec']['templates']}
|
||||
main_dag_tasks = name_to_template[workflow['spec']['entrypoint']]['dag']['tasks']
|
||||
template = name_to_template[main_dag_tasks[0]['template']]
|
||||
|
||||
self.assertEqual(template['podSpecPatch'], '{"containers": [{"name": "main", "resources": {"requests": {"cpu": "{{inputs.parameters.memory}}"}}}]}')
|
||||
|
||||
|
||||
def test_py_retry_policy_invalid(self):
|
||||
def my_pipeline():
|
||||
some_op().set_retry(2, 'Invalid')
|
||||
|
|
|
|||
Loading…
Reference in New Issue