mirror of https://github.com/kubeflow/examples.git
265 lines
7.9 KiB
Plaintext
265 lines
7.9 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Run the following command to install the Kubeflow Pipelines SDK. If you run this command in a Jupyter\n",
|
|
" notebook, restart the kernel after installing the SDK. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"%pip install kfp --upgrade\n",
|
|
"# to install tekton compiler uncomment the line below\n",
|
|
"# %pip install kfp_tekton"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"Import Packages"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import json\n",
|
|
"import time\n",
|
|
"import yaml\n",
|
|
"\n",
|
|
"import kfp\n",
|
|
"import kfp.components as comp\n",
|
|
"import kfp.dsl as dsl"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"SPARK_COMPLETED_STATE = \"COMPLETED\"\n",
|
|
"SPARK_APPLICATION_KIND = \"sparkapplications\""
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"\n",
|
|
"def get_spark_job_definition():\n",
|
|
" \"\"\"\n",
|
|
" Read Spark Operator job manifest file and return the corresponding dictionary and\n",
|
|
" add some randomness in the job name\n",
|
|
" :return: dictionary defining the spark job\n",
|
|
" \"\"\"\n",
|
|
" # Read manifest file\n",
|
|
" with open(\"spark-job.yaml\", \"r\") as stream:\n",
|
|
" spark_job_manifest = yaml.safe_load(stream)\n",
|
|
"\n",
|
|
" # Add epoch time in the job name\n",
|
|
" epoch = int(time.time())\n",
|
|
" spark_job_manifest[\"metadata\"][\"name\"] = spark_job_manifest[\"metadata\"][\"name\"].format(epoch=epoch)\n",
|
|
"\n",
|
|
" return spark_job_manifest"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"def print_op(msg):\n",
|
|
" \"\"\"\n",
|
|
" Op to print a message.\n",
|
|
" \"\"\"\n",
|
|
" return dsl.ContainerOp(\n",
|
|
" name=\"Print message.\",\n",
|
|
" image=\"alpine:3.6\",\n",
|
|
" command=[\"echo\", msg],\n",
|
|
" )"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"@dsl.graph_component # Graph component decorator is used to annotate recursive functions\n",
|
|
"def graph_component_spark_app_status(input_application_name):\n",
|
|
" k8s_get_op = comp.load_component_from_file(\"k8s-get-component.yaml\")\n",
|
|
" check_spark_application_status_op = k8s_get_op(\n",
|
|
" name=input_application_name,\n",
|
|
" kind=SPARK_APPLICATION_KIND\n",
|
|
" )\n",
|
|
" # Remove cache\n",
|
|
" check_spark_application_status_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
|
|
"\n",
|
|
" time.sleep(5)\n",
|
|
" with dsl.Condition(check_spark_application_status_op.outputs[\"applicationstate\"] != SPARK_COMPLETED_STATE):\n",
|
|
" graph_component_spark_app_status(check_spark_application_status_op.outputs[\"name\"])"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"@dsl.pipeline(\n",
|
|
" name=\"Spark Operator job pipeline\",\n",
|
|
" description=\"Spark Operator job pipeline\"\n",
|
|
")\n",
|
|
"def spark_job_pipeline():\n",
|
|
"\n",
|
|
" # Load spark job manifest\n",
|
|
" spark_job_definition = get_spark_job_definition()\n",
|
|
"\n",
|
|
" # Load the kubernetes apply component\n",
|
|
" k8s_apply_op = comp.load_component_from_file(\"k8s-apply-component.yaml\")\n",
|
|
"\n",
|
|
" # Execute the apply command\n",
|
|
" spark_job_op = k8s_apply_op(object=json.dumps(spark_job_definition))\n",
|
|
"\n",
|
|
" # Fetch spark job name\n",
|
|
" spark_job_name = spark_job_op.outputs[\"name\"]\n",
|
|
"\n",
|
|
" # Remove cache for the apply operator\n",
|
|
" spark_job_op.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n",
|
|
"\n",
|
|
" spark_application_status_op = graph_component_spark_app_status(spark_job_op.outputs[\"name\"])\n",
|
|
" spark_application_status_op.after(spark_job_op)\n",
|
|
"\n",
|
|
" print_message = print_op(f\"Job {spark_job_name} is completed.\")\n",
|
|
" print_message.after(spark_application_status_op)\n",
|
|
" print_message.execution_options.caching_strategy.max_cache_staleness = \"P0D\"\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Compile and run your pipeline\n",
|
|
"\n",
|
|
"After defining the pipeline in Python as described in the preceding section, use one of the following options to compile the pipeline and submit it to the Kubeflow Pipelines service.\n",
|
|
"\n",
|
|
"#### Option 1: Compile and then upload in UI\n",
|
|
"\n",
|
|
"1. Run the following to compile your pipeline and save it as `spark_job_pipeline.yaml`. "
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"For Argo (Default)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# create piepline file for argo backend the default one if you use tekton use the block below\n",
|
|
"if __name__ == \"__main__\":\n",
|
|
" # Compile the pipeline\n",
|
|
" import kfp.compiler as compiler\n",
|
|
" import logging\n",
|
|
" logging.basicConfig(level=logging.INFO)\n",
|
|
" pipeline_func = spark_job_pipeline\n",
|
|
" pipeline_filename = pipeline_func.__name__ + \".yaml\"\n",
|
|
" compiler.Compiler().compile(pipeline_func, pipeline_filename)\n",
|
|
" logging.info(f\"Generated pipeline file: {pipeline_filename}.\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"For Tekton"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# uncomment the block below to create pipeline file for tekton\n",
|
|
"\n",
|
|
"# if __name__ == '__main__':\n",
|
|
"# from kfp_tekton.compiler import TektonCompiler\n",
|
|
"# import logging\n",
|
|
"# logging.basicConfig(level=logging.INFO)\n",
|
|
"# pipeline_func = spark_job_pipeline\n",
|
|
"# pipeline_filename = pipeline_func.__name__ + \".yaml\"\n",
|
|
"# TektonCompiler().compile(pipeline_func, pipeline_filename)\n",
|
|
"# logging.info(f\"Generated pipeline file: {pipeline_filename}.\")"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"2. Upload and run your `spark_job_pipeline.yaml` using the Kubeflow Pipelines user interface.\n",
|
|
"See the guide to [getting started with the UI][quickstart].\n",
|
|
"\n",
|
|
"[quickstart]: https://www.kubeflow.org/docs/components/pipelines/overview/quickstart"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Option 2: run the pipeline using Kubeflow Pipelines SDK client\n",
|
|
"\n",
|
|
"1. Create an instance of the [`kfp.Client` class][kfp-client] following steps in [connecting to Kubeflow Pipelines using the SDK client][connect-api].\n",
|
|
"\n",
|
|
"[kfp-client]: https://kubeflow-pipelines.readthedocs.io/en/latest/source/kfp.client.html#kfp.Client\n",
|
|
"[connect-api]: https://www.kubeflow.org/docs/components/pipelines/sdk/connect-api"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"client = kfp.Client() # change arguments accordingly"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"client.create_run_from_pipeline_func(\n",
|
|
" spark_job_pipeline)"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"language_info": {
|
|
"name": "python"
|
|
},
|
|
"orig_nbformat": 4
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|