# Copyright 2023 The Kubeflow Authors # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import Dict, List, Optional, Union from google.protobuf import json_format from google.protobuf import message from kfp import dsl from kfp.dsl import PipelineTask from kfp.kubernetes import common from kfp.kubernetes import kubernetes_executor_config_pb2 as pb @dsl.container_component def CreatePVC( name: dsl.OutputPath(str), access_modes: List[str], size: str, pvc_name: Optional[str] = None, pvc_name_suffix: Optional[str] = None, storage_class_name: Optional[str] = '', volume_name: Optional[str] = None, annotations: Optional[Dict[str, str]] = None, ): """Create a PersistentVolumeClaim, which can be used by downstream tasks. See `PersistentVolume `_ and `PersistentVolumeClaim `_ documentation for more information about the component input parameters. Args: access_modes: AccessModes to request for the provisioned PVC. May be one or more of ``'ReadWriteOnce'``, ``'ReadOnlyMany'``, ``'ReadWriteMany'``, or ``'ReadWriteOncePod'``. Corresponds to `PersistentVolumeClaim.spec.accessModes `_. size: The size of storage requested by the PVC that will be provisioned. For example, ``'5Gi'``. Corresponds to `PersistentVolumeClaim.spec.resources.requests.storage `_. pvc_name: Name of the PVC. Corresponds to `PersistentVolumeClaim.metadata.name `_. Only one of ``pvc_name`` and ``pvc_name_suffix`` can be provided. pvc_name_suffix: Prefix to use for a dynamically generated name, which will take the form ``-``. Only one of ``pvc_name`` and ``pvc_name_suffix`` can be provided. storage_class_name: Name of StorageClass from which to provision the PV to back the PVC. ``None`` indicates to use the cluster's default storage_class_name. Set to ``''`` for a statically specified PVC. volume_name: Pre-existing PersistentVolume that should back the provisioned PersistentVolumeClaim. Used for statically specified PV only. Corresponds to `PersistentVolumeClaim.spec.volumeName `_. annotations: Annotations for the PVC's metadata. Corresponds to `PersistentVolumeClaim.metadata.annotations `_. Returns: ``name: str`` \n\t\t\tName of the generated PVC. """ return dsl.ContainerSpec(image='argostub/createpvc') def mount_pvc( task: PipelineTask, pvc_name: Union[str, 'PipelineChannel'], mount_path: str, ) -> PipelineTask: """Mount a PersistentVolumeClaim to the task's container. Args: task: Pipeline task. pvc_name: Name of the PVC to mount. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. mount_path: Path to which the PVC should be mounted as a volume. Returns: Task object with updated PVC mount configuration. """ msg = common.get_existing_kubernetes_config_as_message(task) pvc_mount = pb.PvcMount(mount_path=mount_path) pvc_name_from_task = _assign_pvc_name_to_msg(pvc_mount, pvc_name) if pvc_name_from_task: task.after(pvc_name.task) msg.pvc_mount.append(pvc_mount) task.platform_config['kubernetes'] = json_format.MessageToDict(msg) return task @dsl.container_component def DeletePVC(pvc_name: str): """Delete a PersistentVolumeClaim. Args: pvc_name: Name of the PVC to delete. Supports passing a runtime-generated name, such as a name provided by ``kubernetes.CreatePvcOp().outputs['name']``. """ return dsl.ContainerSpec(image='argostub/deletepvc') def _assign_pvc_name_to_msg( msg: message.Message, pvc_name: Union[str, 'PipelineChannel'], ) -> bool: """Assigns pvc_name to the msg's pvc_reference oneof. Returns True if pvc_name is an upstream task output. Else, returns False.""" if isinstance(pvc_name, str): msg.constant = pvc_name return False elif hasattr(pvc_name, 'task_name'): if pvc_name.task_name is None: msg.component_input_parameter = pvc_name.name return False else: msg.task_output_parameter.producer_task = pvc_name.task_name msg.task_output_parameter.output_parameter_key = pvc_name.name return True else: raise ValueError( f'Argument for {"pvc_name"!r} must be an instance of str or PipelineChannel. Got unknown input type: {type(pvc_name)!r}. ' )