{ "cells": [ { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "# Copyright (c) Facebook, Inc. and its affiliates.\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# KubeFlow Pipelines : HPO with AX - Pytorch Cifar10 Image classification\n", "\n", "In this example, we train a Pytorch Lightning model to using image classification cifar10 dataset. A parent run will be created during the training process,which would dump the baseline model and relevant parameters,metrics and model along with its summary,subsequently followed by a set of nested child runs, which will dump the trial results. The best parameters would be dumped into the parent run once the experiments are completed.\n", "\n", "This notebook shows PyTorch CIFAR10 end-to-end classification example using Kubeflow Pipelines. \n", "\n", "An example notebook that demonstrates how to:\n", "\n", "* Get different tasks needed for the pipeline\n", "* Create a Kubeflow pipeline\n", "* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline\n", "* Submit a job for execution\n", "* Query(prediction and explain) the final deployed model\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## import the necessary packages" ] }, { "cell_type": "code", "execution_count": 1, "metadata": {}, "outputs": [], "source": [ "! pip uninstall -y kfp\n", "! pip install --no-cache-dir kfp ax-platform" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'1.6.4'" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import kfp\n", "import json\n", "import os\n", "from kfp.onprem import use_k8s_secret\n", "from kfp import components\n", "from kfp.components import load_component_from_file, load_component_from_url, func_to_container_op, InputPath\n", "from kfp import dsl\n", "from kfp import compiler\n", "\n", "import numpy as np\n", "import logging\n", "\n", "from ax.service.ax_client import AxClient\n", "import json\n", "\n", "kfp.__version__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your gateway and the auth token\n", "[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)\n", "\n", "![image.png](./image.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update values for the ingress gateway and auth session" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'\n", "AUTH=\"\" \n", "NAMESPACE=\"kubeflow-user-example-com\"\n", "COOKIE=\"authservice_session=\"+AUTH\n", "EXPERIMENT=\"Default\"\n", "dist_volume = 'dist-vol'\n", "volume_mount_path =\"/model\"\n", "results_path = volume_mount_path+\"/results.json\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the Log bucket and Tensorboard Image" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "MINIO_ENDPOINT=\"http://minio-service.kubeflow:9000\"\n", "LOG_BUCKET=\"mlpipeline\"\n", "TENSORBOARD_IMAGE=\"public.ecr.aws/pytorch-samples/tboard:latest\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the client and create the experiment" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "client = kfp.Client(host=INGRESS_GATEWAY+\"/pipeline\", cookies=COOKIE)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Experiment details." ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "{'created_at': datetime.datetime(2021, 6, 21, 13, 13, 6, tzinfo=tzlocal()),\n", " 'description': None,\n", " 'id': 'ba9b7266-2b1c-4729-afcd-be808c25c5af',\n", " 'name': 'Default',\n", " 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',\n", " 'type': 'NAMESPACE'},\n", " 'name': None,\n", " 'relationship': 'OWNER'}],\n", " 'storage_state': 'STORAGESTATE_AVAILABLE'}" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.create_experiment(EXPERIMENT)\n", "experiments = client.list_experiments(namespace=NAMESPACE)\n", "my_experiment = experiments.experiments[0]\n", "my_experiment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the Inference parameters" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "DEPLOY_NAME=\"torchserve\"\n", "MODEL_NAME=\"cifar10\"\n", "ISVC_NAME=DEPLOY_NAME+\".\"+NAMESPACE+\".\"+\"example.com\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the the components yaml files for setting up the components" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "! python utils/generate_templates.py cifar10/ax_template_mapping.json" ] }, { "cell_type": "code", "execution_count": 12, "metadata": {}, "outputs": [], "source": [ "prepare_tensorboard_op = load_component_from_file(\"yaml/tensorboard_component.yaml\")\n", "\n", "generate_trails_op = components.load_component_from_file(\n", " \"yaml/ax_generate_trials_component.yaml\"\n", ")\n", "\n", "complete_trails_op = components.load_component_from_file(\n", " \"yaml/ax_complete_trials_component.yaml\"\n", ")\n", "\n", "get_keys_op = components.load_component_from_file(\n", " \"../../../components/contrib/json/Get_keys/component.yaml\"\n", ")\n", "\n", "get_element_op = components.load_component_from_file(\n", " \"../../../components/contrib/json/Get_element_by_key/component.yaml\"\n", ")\n", "prep_op = components.load_component_from_file(\n", " \"yaml/preprocess_component.yaml\"\n", ")\n", "\n", "# Uncomment hpo inputs in component yaml\n", "train_op = components.load_component_from_file(\n", " \"yaml/ax_train_component.yaml\"\n", ")\n", "\n", "kubernetes_create_pvc_op = load_component_from_file(\"../../../components/contrib/kubernetes/Create_PersistentVolumeClaim/component.yaml\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource\n", "def create_dist_pipeline():\n", " kubernetes_create_pvc_op(name=dist_volume, storage_size= \"5Gi\", namespace=NAMESPACE)\n", "\n", "create_volume_run = client.create_run_from_pipeline_func(create_dist_pipeline, arguments={})\n", "create_volume_run.wait_for_run_completion()" ] }, { "cell_type": "code", "execution_count": 13, "metadata": { "tags": [] }, "outputs": [], "source": [ "parameters = [\n", " {\"name\": \"lr\", \"type\": \"range\", \"bounds\": [1e-4, 0.2], \"log_scale\": True},\n", " {\"name\": \"weight_decay\", \"type\": \"range\", \"bounds\": [1e-4, 1e-2]},\n", " {\"name\": \"eps\", \"type\": \"range\", \"bounds\": [1e-8, 1e-2]},\n", " ]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the pipeline" ] }, { "cell_type": "code", "execution_count": 14, "metadata": { "tags": [] }, "outputs": [], "source": [ "@dsl.pipeline(\n", " name=\"AX Hpo\", description=\"Estimating best parameters using AX\"\n", ")\n", "def pytorch_ax_hpo( # pylint: disable=too-many-arguments\n", " minio_endpoint=MINIO_ENDPOINT,\n", " log_bucket=LOG_BUCKET,\n", " log_dir=f\"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}\",\n", " mar_path=f\"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store\",\n", " config_prop_path=f\"mar/{dsl.RUN_ID_PLACEHOLDER}/config\",\n", " model_uri=f\"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}\",\n", " best_params=f\"hpo/{dsl.RUN_ID_PLACEHOLDER}\",\n", " tf_image=TENSORBOARD_IMAGE,\n", " deploy=DEPLOY_NAME,\n", " isvc_name=ISVC_NAME,\n", " model=MODEL_NAME,\n", " namespace=NAMESPACE,\n", " confusion_matrix_log_dir=f\"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/\",\n", " checkpoint_dir=\"checkpoint_dir/cifar10\",\n", " input_req=INPUT_REQUEST,\n", " cookie=COOKIE,\n", " total_trials=2,\n", " ingress_gateway=INGRESS_GATEWAY,\n", "):\n", " \n", " \"\"\"This method defines the pipeline tasks and operations\"\"\"\n", " pod_template_spec = json.dumps({\n", " \"spec\": {\n", " \"containers\": [{\n", " \"env\": [\n", " {\n", " \"name\": \"AWS_ACCESS_KEY_ID\",\n", " \"valueFrom\": {\n", " \"secretKeyRef\": {\n", " \"name\": \"mlpipeline-minio-artifact\",\n", " \"key\": \"accesskey\",\n", " }\n", " },\n", " },\n", " {\n", " \"name\": \"AWS_SECRET_ACCESS_KEY\",\n", " \"valueFrom\": {\n", " \"secretKeyRef\": {\n", " \"name\": \"mlpipeline-minio-artifact\",\n", " \"key\": \"secretkey\",\n", " }\n", " },\n", " },\n", " {\n", " \"name\": \"AWS_REGION\",\n", " \"value\": \"minio\"\n", " },\n", " {\n", " \"name\": \"S3_ENDPOINT\",\n", " \"value\": f\"{minio_endpoint}\",\n", " },\n", " {\n", " \"name\": \"S3_USE_HTTPS\",\n", " \"value\": \"0\"\n", " },\n", " {\n", " \"name\": \"S3_VERIFY_SSL\",\n", " \"value\": \"0\"\n", " },\n", " ]\n", " }]\n", " }\n", " })\n", "\n", " prepare_tb_task = prepare_tensorboard_op(\n", " log_dir_uri=f\"s3://{log_bucket}/{log_dir}\",\n", " image=tf_image,\n", " pod_template_spec=pod_template_spec,\n", " ).set_display_name(\"Visualization\")\n", "\n", " prep_task = (\n", " prep_op().after(prepare_tb_task).set_display_name(\"Preprocess & Transform\")\n", " )\n", "\n", " gen_trials_task = generate_trails_op(total_trials, parameters, 'test-accuracy').after(prep_task).set_display_name(\"AX Generate Trials\")\n", " \n", " get_keys_task = get_keys_op(gen_trials_task.outputs[\"trial_parameters\"]).after(gen_trials_task).set_display_name(\"Get Keys of Trials\")\n", " \n", " confusion_matrix_url = f\"minio://{log_bucket}/{confusion_matrix_log_dir}\"\n", " script_args = f\"model_name=resnet.pth,\" \\\n", " f\"confusion_matrix_url={confusion_matrix_url}\"\n", " ptl_args = f\"max_epochs=1, profiler=pytorch\"\n", "\n", " with dsl.ParallelFor(get_keys_task.outputs[\"keys\"]) as item:\n", " get_element_task = get_element_op(gen_trials_task.outputs[\"trial_parameters\"], item).after(get_keys_task).set_display_name(\"Get Element from key\")\n", " train_task = (\n", " train_op(\n", " trial_id=item,\n", " input_data=prep_task.outputs[\"output_data\"],\n", " script_args=script_args,\n", " model_parameters=get_element_task.outputs[\"output\"],\n", " ptl_arguments=ptl_args,\n", " results=results_path\n", " ).add_pvolumes({volume_mount_path: dsl.PipelineVolume(pvc=dist_volume)}).after(get_element_task).set_display_name(\"Training\")\n", " # For allocating resources, uncomment below lines\n", " # .set_memory_request('600M')\n", " # .set_memory_limit('1200M')\n", " # .set_cpu_request('700m')\n", " # .set_cpu_limit('1400m')\n", " # For GPU uncomment below line and set GPU limit and node selector\n", " # .set_gpu_limit(1).add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-p4')\n", " )\n", " \n", " complete_trials_task = complete_trails_op(gen_trials_task.outputs[\"client\"], results_path).add_pvolumes({volume_mount_path: dsl.PipelineVolume(pvc=dist_volume)}).after(train_task).set_display_name(\"AX Complete Trials\")\n", "\n", " dsl.get_pipeline_conf().add_op_transformer(\n", " use_k8s_secret(\n", " secret_name=\"mlpipeline-minio-artifact\",\n", " k8s_secret_key_to_env={\n", " \"secretkey\": \"MINIO_SECRET_KEY\",\n", " \"accesskey\": \"MINIO_ACCESS_KEY\",\n", " },\n", " )\n", " )\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compile the pipeline" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "compiler.Compiler().compile(pytorch_ax_hpo, 'pytorch.tar.gz', type_check=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Execute the pipeline" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Run details." ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "run = client.run_pipeline(my_experiment.id, 'pytorch_ax_hpo', 'pytorch.tar.gz')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Viewing results\n", "\n", "Wait for the pipeline execution to be completed. Sample pipeline shown below" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![](screenshots/ax-hpo-pipeline.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Click on \"AX Complete Trials\" component. The best hyperparameters are shown in the Input/Output tab as shown below" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![](screenshots/ax-complete-trials.png)" ] }, { "attachments": {}, "cell_type": "markdown", "metadata": {}, "source": [ "![](screenshots/ax-best-parameters.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.2" } }, "nbformat": 4, "nbformat_minor": 4 }