pipelines/kubernetes_platform/python/kfp/kubernetes/volume.py

124 lines
5.7 KiB
Python

# Copyright 2023 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List, Optional, Union
from google.protobuf import json_format
from google.protobuf import message
from kfp import dsl
from kfp.dsl import PipelineTask
from kfp.kubernetes import common
from kfp.kubernetes import kubernetes_executor_config_pb2 as pb
@dsl.container_component
def CreatePVC(
name: dsl.OutputPath(str),
access_modes: List[str],
size: str,
pvc_name: Optional[str] = None,
pvc_name_suffix: Optional[str] = None,
storage_class_name: Optional[str] = '',
volume_name: Optional[str] = None,
annotations: Optional[Dict[str, str]] = None,
):
"""Create a PersistentVolumeClaim, which can be used by downstream tasks.
See `PersistentVolume <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistent-volumes>`_ and `PersistentVolumeClaim <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#persistentvolumeclaims>`_ documentation for more information about
the component input parameters.
Args:
access_modes: AccessModes to request for the provisioned PVC. May
be one or more of ``'ReadWriteOnce'``, ``'ReadOnlyMany'``, ``'ReadWriteMany'``, or
``'ReadWriteOncePod'``. Corresponds to `PersistentVolumeClaim.spec.accessModes <https://kubernetes.io/docs/concepts/storage/persistent-volumes/#access-modes>`_.
size: The size of storage requested by the PVC that will be provisioned. For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaimSpec>`_.
pvc_name: Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_. Only one of ``pvc_name`` and ``pvc_name_suffix`` can
be provided.
pvc_name_suffix: Prefix to use for a dynamically generated name, which
will take the form ``<argo-workflow-name>-<pvc_name_suffix>``. Only one
of ``pvc_name`` and ``pvc_name_suffix`` can be provided.
storage_class_name: Name of StorageClass from which to provision the PV
to back the PVC. ``None`` indicates to use the cluster's default
storage_class_name. Set to ``''`` for a statically specified PVC.
volume_name: Pre-existing PersistentVolume that should back the
provisioned PersistentVolumeClaim. Used for statically
specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaimSpec>`_.
annotations: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations <https://kubernetes.io/docs/reference/kubernetes-api/config-and-storage-resources/persistent-volume-claim-v1/#PersistentVolumeClaim>`_.
Returns:
``name: str`` \n\t\t\tName of the generated PVC.
"""
return dsl.ContainerSpec(image='argostub/createpvc')
def mount_pvc(
task: PipelineTask,
pvc_name: Union[str, 'PipelineChannel'],
mount_path: str,
) -> PipelineTask:
"""Mount a PersistentVolumeClaim to the task's container.
Args:
task: Pipeline task.
pvc_name: Name of the PVC to mount. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``.
mount_path: Path to which the PVC should be mounted as a volume.
Returns:
Task object with updated PVC mount configuration.
"""
msg = common.get_existing_kubernetes_config_as_message(task)
pvc_mount = pb.PvcMount(mount_path=mount_path)
pvc_name_from_task = _assign_pvc_name_to_msg(pvc_mount, pvc_name)
if pvc_name_from_task:
task.after(pvc_name.task)
msg.pvc_mount.append(pvc_mount)
task.platform_config['kubernetes'] = json_format.MessageToDict(msg)
return task
@dsl.container_component
def DeletePVC(pvc_name: str):
"""Delete a PersistentVolumeClaim.
Args:
pvc_name: Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``.
"""
return dsl.ContainerSpec(image='argostub/deletepvc')
def _assign_pvc_name_to_msg(
msg: message.Message,
pvc_name: Union[str, 'PipelineChannel'],
) -> bool:
"""Assigns pvc_name to the msg's pvc_reference oneof. Returns True if pvc_name is an upstream task output. Else, returns False."""
if isinstance(pvc_name, str):
msg.constant = pvc_name
return False
elif hasattr(pvc_name, 'task_name'):
if pvc_name.task_name is None:
msg.component_input_parameter = pvc_name.name
return False
else:
msg.task_output_parameter.producer_task = pvc_name.task_name
msg.task_output_parameter.output_parameter_key = pvc_name.name
return True
else:
raise ValueError(
f'Argument for {"pvc_name"!r} must be an instance of str or PipelineChannel. Got unknown input type: {type(pvc_name)!r}. '
)