{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Copyright (c) Facebook, Inc. and its affiliates.\n", "# Licensed under the Apache License, Version 2.0 (the \"License\");\n", "# you may not use this file except in compliance with the License.\n", "# You may obtain a copy of the License at\n", "#\n", "# http://www.apache.org/licenses/LICENSE-2.0\n", "#\n", "# Unless required by applicable law or agreed to in writing, software\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\n", "# See the License for the specific language governing permissions and\n", "# limitations under the License." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# KubeFlow Pipelines : Pytorch Cifar10 Image classification\n", "\n", "This notebook shows PyTorch CIFAR10 end-to-end classification example using Kubeflow Pipelines. \n", "\n", "An example notebook that demonstrates how to:\n", "\n", "* Get different tasks needed for the pipeline\n", "* Create a Kubeflow pipeline\n", "* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline\n", "* Submit a job for execution\n", "* Query(prediction and explain) the final deployed model\n" ] }, { "cell_type": "code", "execution_count": 29, "metadata": {}, "outputs": [], "source": [ "! pip uninstall -y kfp\n", "! pip install --no-cache-dir kfp" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## import the necessary packages" ] }, { "cell_type": "code", "execution_count": 2, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'1.8.12'" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "import kfp\n", "import json\n", "import os\n", "from kfp.onprem import use_k8s_secret\n", "from kfp import components\n", "from kfp.components import load_component_from_file, load_component_from_url\n", "from kfp import dsl\n", "from kfp import compiler\n", "\n", "import numpy as np\n", "import logging\n", "\n", "kfp.__version__" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Enter your gateway and the auth token\n", "[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)\n", "\n", "![image.png](./image.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Update values for the ingress gateway and auth session" ] }, { "cell_type": "code", "execution_count": 3, "metadata": {}, "outputs": [], "source": [ "INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'\n", "AUTH=\"\" \n", "NAMESPACE=\"kubeflow-user-example-com\"\n", "COOKIE=\"authservice_session=\"+AUTH\n", "EXPERIMENT=\"Default\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the Log bucket and Tensorboard Image" ] }, { "cell_type": "code", "execution_count": 4, "metadata": {}, "outputs": [], "source": [ "MINIO_ENDPOINT=\"http://minio-service.kubeflow:9000\"\n", "LOG_BUCKET=\"mlpipeline\"\n", "TENSORBOARD_IMAGE=\"public.ecr.aws/pytorch-samples/tboard:latest\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the client and create the experiment" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "client = kfp.Client(host=INGRESS_GATEWAY+\"/pipeline\", cookies=COOKIE)" ] }, { "cell_type": "code", "execution_count": 6, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Experiment details." ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/plain": [ "{'created_at': datetime.datetime(2022, 4, 21, 9, 45, 22, tzinfo=tzlocal()),\n", " 'description': None,\n", " 'id': 'b4bee8c3-381b-42a0-9494-bc81eb9aa359',\n", " 'name': 'Default',\n", " 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',\n", " 'type': 'NAMESPACE'},\n", " 'name': None,\n", " 'relationship': 'OWNER'}],\n", " 'storage_state': 'STORAGESTATE_AVAILABLE'}" ] }, "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.create_experiment(EXPERIMENT)\n", "experiments = client.list_experiments(namespace=NAMESPACE)\n", "my_experiment = experiments.experiments[0]\n", "my_experiment" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Set the Inference parameters" ] }, { "cell_type": "code", "execution_count": 7, "metadata": {}, "outputs": [], "source": [ "DEPLOY_NAME=\"torchserve\"\n", "MODEL_NAME=\"cifar10\"\n", "ISVC_NAME=DEPLOY_NAME+\".\"+NAMESPACE+\".\"+\"example.com\"\n", "INPUT_REQUEST=\"https://raw.githubusercontent.com/kubeflow/pipelines/master/samples/contrib/pytorch-samples/cifar10/input.json\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Load the the components yaml files for setting up the components" ] }, { "cell_type": "code", "execution_count": 8, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Processing prediction_component.yaml\n", "Processing ax_complete_trials_component.yaml\n", "Processing preprocess_component.yaml\n", "Processing train_component.yaml\n", "Processing tensorboard_component.yaml\n", "Processing ax_generate_trials_component.yaml\n", "Processing minio_component.yaml\n", "Processing copy_component.yaml\n", "Processing ax_train_component.yaml\n" ] } ], "source": [ "! python utils/generate_templates.py cifar10/template_mapping.json" ] }, { "cell_type": "code", "execution_count": 9, "metadata": {}, "outputs": [], "source": [ "prepare_tensorboard_op = load_component_from_file(\"yaml/tensorboard_component.yaml\")\n", "\n", "prep_op = components.load_component_from_file(\n", " \"yaml/preprocess_component.yaml\"\n", ")\n", "\n", "train_op = components.load_component_from_file(\n", " \"yaml/train_component.yaml\"\n", ")\n", "deploy_op = load_component_from_file(\n", " \"../../../components/kserve/component.yaml\"\n", ")\n", "\n", "pred_op = load_component_from_file(\"yaml/prediction_component.yaml\")\n", "\n", "minio_op = components.load_component_from_file(\n", " \"yaml/minio_component.yaml\"\n", ")\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Define the pipeline" ] }, { "cell_type": "code", "execution_count": 16, "metadata": {}, "outputs": [], "source": [ "\n", "@dsl.pipeline(\n", " name=\"Training Cifar10 pipeline\", description=\"Cifar 10 dataset pipeline\"\n", ")\n", "def pytorch_cifar10( # pylint: disable=too-many-arguments\n", " minio_endpoint=MINIO_ENDPOINT,\n", " log_bucket=LOG_BUCKET,\n", " log_dir=f\"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}\",\n", " mar_path=f\"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store\",\n", " config_prop_path=f\"mar/{dsl.RUN_ID_PLACEHOLDER}/config\",\n", " model_uri=f\"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}\",\n", " tf_image=TENSORBOARD_IMAGE,\n", " deploy=DEPLOY_NAME,\n", " isvc_name=ISVC_NAME,\n", " model=MODEL_NAME,\n", " namespace=NAMESPACE,\n", " confusion_matrix_log_dir=f\"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/\",\n", " checkpoint_dir=\"checkpoint_dir/cifar10\",\n", " input_req=INPUT_REQUEST,\n", " cookie=COOKIE,\n", " ingress_gateway=INGRESS_GATEWAY,\n", "):\n", " def sleep_op(seconds):\n", " \"\"\"Sleep for a while.\"\"\"\n", " return dsl.ContainerOp(\n", " name=\"Sleep \" + str(seconds) + \" seconds\",\n", " image=\"python:alpine3.6\",\n", " command=[\"sh\", \"-c\"],\n", " arguments=[\n", " 'python -c \"import time; time.sleep($0)\"',\n", " str(seconds)\n", " ],\n", " )\n", "\n", " \"\"\"This method defines the pipeline tasks and operations\"\"\"\n", " pod_template_spec = json.dumps({\n", " \"spec\": {\n", " \"containers\": [{\n", " \"env\": [\n", " {\n", " \"name\": \"AWS_ACCESS_KEY_ID\",\n", " \"valueFrom\": {\n", " \"secretKeyRef\": {\n", " \"name\": \"mlpipeline-minio-artifact\",\n", " \"key\": \"accesskey\",\n", " }\n", " },\n", " },\n", " {\n", " \"name\": \"AWS_SECRET_ACCESS_KEY\",\n", " \"valueFrom\": {\n", " \"secretKeyRef\": {\n", " \"name\": \"mlpipeline-minio-artifact\",\n", " \"key\": \"secretkey\",\n", " }\n", " },\n", " },\n", " {\n", " \"name\": \"AWS_REGION\",\n", " \"value\": \"minio\"\n", " },\n", " {\n", " \"name\": \"S3_ENDPOINT\",\n", " \"value\": f\"{minio_endpoint}\",\n", " },\n", " {\n", " \"name\": \"S3_USE_HTTPS\",\n", " \"value\": \"0\"\n", " },\n", " {\n", " \"name\": \"S3_VERIFY_SSL\",\n", " \"value\": \"0\"\n", " },\n", " ]\n", " }]\n", " }\n", " })\n", "\n", " prepare_tb_task = prepare_tensorboard_op(\n", " log_dir_uri=f\"s3://{log_bucket}/{log_dir}\",\n", " image=tf_image,\n", " pod_template_spec=pod_template_spec,\n", " ).set_display_name(\"Visualization\")\n", "\n", " prep_task = (\n", " prep_op().after(prepare_tb_task\n", " ).set_display_name(\"Preprocess & Transform\")\n", " )\n", " confusion_matrix_url = f\"minio://{log_bucket}/{confusion_matrix_log_dir}\"\n", " script_args = f\"model_name=resnet.pth,\" \\\n", " f\"confusion_matrix_url={confusion_matrix_url}\"\n", " # For GPU, set number of devices and strategy type\n", " ptl_args = f\"max_epochs=1, devices=0, strategy=None, profiler=pytorch\"\n", " train_task = (\n", " train_op(\n", " input_data=prep_task.outputs[\"output_data\"],\n", " script_args=script_args,\n", " ptl_arguments=ptl_args\n", " ).after(prep_task).set_display_name(\"Training\")\n", " # For allocating resources, uncomment below lines\n", " # .set_memory_request('600M')\n", " # .set_memory_limit('1200M')\n", " # .set_cpu_request('700m')\n", " # .set_cpu_limit('1400m')\n", " # For GPU uncomment below line and set GPU limit and node selector\n", " # .set_gpu_limit(1).add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-p4')\n", " )\n", "\n", "\n", " (\n", " minio_op(\n", " bucket_name=\"mlpipeline\",\n", " folder_name=log_dir,\n", " input_path=train_task.outputs[\"tensorboard_root\"],\n", " filename=\"\",\n", " ).after(train_task).set_display_name(\"Tensorboard Events Pusher\")\n", " )\n", "\n", " (\n", " minio_op(\n", " bucket_name=\"mlpipeline\",\n", " folder_name=checkpoint_dir,\n", " input_path=train_task.outputs[\"checkpoint_dir\"],\n", " filename=\"\",\n", " ).after(train_task).set_display_name(\"checkpoint_dir Pusher\")\n", " )\n", "\n", " minio_mar_upload = (\n", " minio_op(\n", " bucket_name=\"mlpipeline\",\n", " folder_name=mar_path,\n", " input_path=train_task.outputs[\"checkpoint_dir\"],\n", " filename=\"cifar10_test.mar\",\n", " ).after(train_task).set_display_name(\"Mar Pusher\")\n", " )\n", "\n", " (\n", " minio_op(\n", " bucket_name=\"mlpipeline\",\n", " folder_name=config_prop_path,\n", " input_path=train_task.outputs[\"checkpoint_dir\"],\n", " filename=\"config.properties\",\n", " ).after(train_task).set_display_name(\"Conifg Pusher\")\n", " )\n", "\n", " model_uri = str(model_uri)\n", " # pylint: disable=unused-variable\n", " isvc_yaml = \"\"\"\n", " apiVersion: \"serving.kserve.io/v1beta1\"\n", " kind: \"InferenceService\"\n", " metadata:\n", " name: {}\n", " namespace: {}\n", " spec:\n", " predictor:\n", " serviceAccountName: sa\n", " pytorch:\n", " protocolVersion: v2\n", " storageUri: {}\n", " resources:\n", " requests: \n", " cpu: 4\n", " memory: 8Gi\n", " limits:\n", " cpu: 4\n", " memory: 8Gi\n", " \"\"\".format(deploy, namespace, model_uri)\n", "\n", " # For GPU inference use below yaml with gpu count and accelerator\n", " gpu_count = \"1\"\n", " accelerator = \"nvidia-tesla-p4\"\n", " isvc_gpu_yaml = \"\"\"# pylint: disable=unused-variable\n", " apiVersion: \"serving.kserve.io/v1beta1\"\n", " kind: \"InferenceService\"\n", " metadata:\n", " name: {}\n", " namespace: {}\n", " spec:\n", " predictor:\n", " serviceAccountName: sa\n", " pytorch:\n", " protocolVersion: v2\n", " storageUri: {}\n", " resources:\n", " requests: \n", " cpu: 16\n", " memory: 24Gi\n", " limits:\n", " cpu: 16\n", " memory: 24Gi\n", " nvidia.com/gpu: {}\n", " nodeSelector:\n", " cloud.google.com/gke-accelerator: {}\n", "\"\"\".format(deploy, namespace, model_uri, gpu_count, accelerator)\n", " # Update inferenceservice_yaml for GPU inference\n", " deploy_task = (\n", " deploy_op(action=\"apply\", inferenceservice_yaml=isvc_yaml\n", " ).after(minio_mar_upload).set_display_name(\"Deployer\")\n", " )\n", " # Wait here for model to be loaded in torchserve for inference\n", " sleep_task = sleep_op(60).after(deploy_task).set_display_name(\"Sleep\")\n", " # Make Inference request\n", " pred_task = (\n", " pred_op(\n", " host_name=isvc_name,\n", " input_request=input_req,\n", " cookie=cookie,\n", " url=ingress_gateway,\n", " model=model,\n", " inference_type=\"infer\",\n", " ).after(sleep_task).set_display_name(\"Prediction\")\n", " )\n", " (\n", " pred_op(\n", " host_name=isvc_name,\n", " input_request=input_req,\n", " cookie=cookie,\n", " url=ingress_gateway,\n", " model=model,\n", " inference_type=\"explain\",\n", " ).after(pred_task).set_display_name(\"Explanation\")\n", " )\n", "\n", " dsl.get_pipeline_conf().add_op_transformer(\n", " use_k8s_secret(\n", " secret_name=\"mlpipeline-minio-artifact\",\n", " k8s_secret_key_to_env={\n", " \"secretkey\": \"MINIO_SECRET_KEY\",\n", " \"accesskey\": \"MINIO_ACCESS_KEY\",\n", " },\n", " )\n", " )\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Compile the pipeline" ] }, { "cell_type": "code", "execution_count": 17, "metadata": {}, "outputs": [], "source": [ "compiler.Compiler().compile(pytorch_cifar10, 'pytorch.tar.gz', type_check=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Execute the pipeline" ] }, { "cell_type": "code", "execution_count": 18, "metadata": {}, "outputs": [ { "data": { "text/html": [ "Run details." ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "run = client.run_pipeline(my_experiment.id, 'pytorch-cifar10', 'pytorch.tar.gz')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Wait for inference service below to go to READY True state" ] }, { "cell_type": "code", "execution_count": 14, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "NAME URL READY PREV LATEST PREVROLLEDOUTREVISION LATESTREADYREVISION AGE\n", "bertserve http://bertserve.kubeflow-user-example-com.example.com True 100 bertserve-predictor-default-00003 3h11m\n", "torchserve http://torchserve.kubeflow-user-example-com.example.com True 100 torchserve-predictor-default-00001 5m19s\n" ] } ], "source": [ "!kubectl get isvc $DEPLOY" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get the Inference service name" ] }, { "cell_type": "code", "execution_count": 15, "metadata": {}, "outputs": [ { "data": { "text/plain": [ "'torchserve.kubeflow-user-example-com.example.com'" ] }, "execution_count": 15, "metadata": {}, "output_type": "execute_result" } ], "source": [ "INFERENCE_SERVICE_LIST = ! kubectl get isvc {DEPLOY_NAME} -n {NAMESPACE} -o json | python3 -c \"import sys, json; print(json.load(sys.stdin)['status']['url'])\"| tr -d '\"' | cut -d \"/\" -f 3\n", "INFERENCE_SERVICE_NAME = INFERENCE_SERVICE_LIST[0]\n", "INFERENCE_SERVICE_NAME" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use the deployed model for prediction request and save the output into a json" ] }, { "cell_type": "code", "execution_count": 21, "metadata": {}, "outputs": [], "source": [ "!python cifar10/tobytes.py cifar10/kitten.png" ] }, { "cell_type": "code", "execution_count": 30, "metadata": {}, "outputs": [], "source": [ "!curl -v -H \"Host: $INFERENCE_SERVICE_NAME\" -H \"Cookie: $COOKIE\" \"$INGRESS_GATEWAY/v2/models/$MODEL_NAME/infer\" -d @./cifar10/kitten.json > cifar10_prediction_output.json" ] }, { "cell_type": "code", "execution_count": 26, "metadata": { "scrolled": true }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "{\"id\": \"fda5a0a4-fe03-4476-8258-fee5017e6c50\", \"model_name\": \"cifar10_test\", \"model_version\": \"1\", \"outputs\": [{\"name\": \"predict\", \"shape\": [], \"datatype\": \"BYTES\", \"data\": [{\"truck\": 0.7257930040359497, \"car\": 0.12065636366605759, \"plane\": 0.0643853172659874, \"frog\": 0.030459348112344742, \"ship\": 0.01999029517173767}]}]}" ] } ], "source": [ "! cat cifar10_prediction_output.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use the deployed model for explain request and save the output into a json" ] }, { "cell_type": "code", "execution_count": 31, "metadata": {}, "outputs": [], "source": [ "!curl -v -H \"Host: $INFERENCE_SERVICE_NAME\" -H \"Cookie: $COOKIE\" \"$INGRESS_GATEWAY/v2/models/$MODEL_NAME/explain\" -d @./cifar10/kitten.json > cifar10_explanation_output.json" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Clean up\n", "#### Delete Viewers, Inference Services and Completed pods" ] }, { "cell_type": "code", "execution_count": 25, "metadata": { "scrolled": true }, "outputs": [], "source": [ "! kubectl delete --all isvc -n $NAMESPACE" ] }, { "cell_type": "code", "execution_count": 26, "metadata": {}, "outputs": [], "source": [ "! kubectl delete pod --field-selector=status.phase==Succeeded -n $NAMESPACE" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.10" } }, "nbformat": 4, "nbformat_minor": 4 }