diff --git a/OWNERS b/OWNERS index a519301e..f106766c 100644 --- a/OWNERS +++ b/OWNERS @@ -2,3 +2,4 @@ approvers: - jinchihe - js-ts - akgraner + - kbthu diff --git a/codes/parallelism.py b/codes/parallelism.py new file mode 100644 index 00000000..4c47f780 --- /dev/null +++ b/codes/parallelism.py @@ -0,0 +1,20 @@ +import kfp +from kfp import dsl +from kfp.components import create_component_from_func + +@create_component_from_func +def print_op(message: str): + print(message) + +@dsl.pipeline( + name="Kubeflow pipeline parallel example", + description="Demonstrate the parallel pods of Kubeflow pipeline." +) +def parallelism(): + op1 = print_op("training first model") + op2 = print_op("training second model") + dsl.get_pipeline_conf().set_parallelism(2) + +if __name__ == "__main__": + import kfp.compiler as compiler + compiler.Compiler().compile(parallelism, (__file__ + ".yaml").replace(".py", "")) diff --git a/codes/volume_parallel.py b/codes/volume_parallel.py new file mode 100644 index 00000000..988212e3 --- /dev/null +++ b/codes/volume_parallel.py @@ -0,0 +1,62 @@ + +import kfp +from kfp import dsl + +def create_pv(): + return dsl.VolumeOp( + name="create_pv", + resource_name="kfp-pvc", + size="1Gi", + modes=dsl.VOLUME_MODE_RWO + ) + + +def parallel_1(vol_name: str): + cop = dsl.ContainerOp( + name='generate_data', + image='bash:5.1', + command=['sh', '-c'], + arguments=['echo 1 | tee /mnt/out1.txt'] + ) + cop.container.set_image_pull_policy('IfNotPresent') + cop.add_pvolumes({'/mnt': dsl.PipelineVolume(pvc=vol_name)}) + return cop + + +def parallel_2(vol_name: str): + cop = dsl.ContainerOp( + name='generate_data', + image='bash:5.1', + command=['sh', '-c'], + arguments=['echo 2 | tee /mnt/out2.txt'] + ) + cop.container.set_image_pull_policy('IfNotPresent') + cop.add_pvolumes({'/mnt': dsl.PipelineVolume(pvc=vol_name)}) + return cop + + +def parallel_3(vol_name: str): + cop = dsl.ContainerOp( + name='generate_data', + image='bash:5.1', + command=['sh', '-c'], + arguments=['echo 3 | tee /mnt/out3.txt'] + ) + cop.container.set_image_pull_policy('IfNotPresent') + cop.add_pvolumes({'/mnt': dsl.PipelineVolume(pvc=vol_name)}) + return cop + + +@dsl.pipeline( + name="Kubeflow volume parallel example", + description="Demonstrate the use case of volume on Kubeflow pipeline.") +def volume_parallel(): + vop = create_pv() + cop1 = parallel_1(vop.outputs["name"]).after(vop) + cop2 = parallel_2(vop.outputs["name"]).after(vop) + cop3 = parallel_3(vop.outputs["name"]).after(vop) + + +if __name__ == "__main__": + import kfp.compiler as compiler + compiler.Compiler().compile(volume_parallel, __file__ + ".yaml") \ No newline at end of file