pipelines/kubernetes_platform/python
Connor McCarthy 236cbd0bd1
chore(sdk): release kfp-kubernetes 1.0.0 (#9662)
2023-06-20 20:39:40 +00:00
..
docs chore: add kfp-kubernetes docs and update process infrastructure (#8976) 2023-03-15 18:23:12 +00:00
kfp/kubernetes chore(sdk): release kfp-kubernetes 1.0.0 (#9662) 2023-06-20 20:39:40 +00:00
test chore: change sample PVC access modes to ReadWriteOnce (#9425) 2023-05-18 01:22:06 +00:00
README.md chore: change sample PVC access modes to ReadWriteOnce (#9425) 2023-05-18 01:22:06 +00:00
create_release_branch.sh chore: add kfp-kubernetes docs and update process infrastructure (#8976) 2023-03-15 18:23:12 +00:00
generate_proto.py feat: add kubernetes platform-specific protos, generated Go code, and Python package (#8888) 2023-03-04 07:10:33 +00:00
release.sh chore: add kfp-kubernetes docs and update process infrastructure (#8976) 2023-03-15 18:23:12 +00:00
setup.py chore(sdk): add kfp-kubernetes documentation to PyPI (#9148) 2023-04-13 20:47:43 +00:00

README.md

Kubernetes Platform-specific Features

The kfp-kubernetes Python library enables authoring Kubeflow pipelines with Kubernetes-specific features. These features are supported by the default KFP open source BE. Specifically, the kfp-kubernetes library supports authoring pipelines that use:

See the kfp-kubernetes reference documentation.

Installation

The kfp-kubernetes package can be installed as a kfp SDK extra dependency with kfp==2.x.x:

pip install kfp[kubernetes] --pre

Or installed independently:

pip install kfp-kubernetes

Example usage

Secret: As environment variable

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    import os
    print(os.environ['my-secret'])

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_env(task,
                                 secret_name='my-secret',
                                 secret_key_to_env={'password': 'SECRET_VAR'})

Secret: As mounted volume

from kfp import dsl
from kfp import kubernetes

@dsl.component
def print_secret():
    with open('/mnt/my_vol') as f:
        print(f.read())

@dsl.pipeline
def pipeline():
    task = print_secret()
    kubernetes.use_secret_as_volume(task,
                                    secret_name='my-secret',
                                    mount_path='/mnt/my_vol')

PersistentVolumeClaim: Dynamically create PVC, mount, then delete

from kfp import dsl
from kfp import kubernetes

@dsl.component
def make_data():
    with open('/data/file.txt', 'w') as f:
        f.write('my data')

@dsl.component
def read_data():
    with open('/reused_data/file.txt') as f:
        print(f.read())

@dsl.pipeline
def my_pipeline():
    pvc1 = kubernetes.CreatePVC(
        # can also use pvc_name instead of pvc_name_suffix to use a pre-existing PVC
        pvc_name_suffix='-my-pvc',
        access_modes=['ReadWriteOnce'],
        size='5Gi',
        storage_class_name='standard',
    )

    task1 = make_data()
    # normally task sequencing is handled by data exchange via component inputs/outputs
    # but since data is exchanged via volume, we need to call .after explicitly to sequence tasks
    task2 = read_data().after(task1)

    kubernetes.mount_pvc(
        task1,
        pvc_name=pvc1.outputs['name'],
        mount_path='/data',
    )
    kubernetes.mount_pvc(
        task2,
        pvc_name=pvc1.outputs['name'],
        mount_path='/reused_data',
    )

    # wait to delete the PVC until after task2 completes
    delete_pvc1 = kubernetes.DeletePVC(
        pvc_name=pvc1.outputs['name']).after(task2)