{ "cells": [ { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import json\n", "import kfp\n", "from kfp import components\n", "from kfp.components import func_to_container_op\n", "import kfp.dsl as dsl" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_name = \"text-classification\"\n", "user_namespace = \"kubeflow-mailsforyashj\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def add_istio_annotation(op):\n", " op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')\n", " return op" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "@dsl.pipeline(\n", " name=\"End to end pipeline\",\n", " description=\"An end to end example including hyperparameter tuning\"\n", ")\n", "def text_classification_pipeline(name=model_name, namespace=user_namespace, step=4000):\n", " # step 1: create a Katib experiment to tune hyperparameters\n", " objectiveConfig = {\n", " \"type\": \"maximize\",\n", " \"goal\": 0.6,\n", " \"objectiveMetricName\": \"val_accuracy\",\n", " }\n", " algorithmConfig = {\"algorithmName\" : \"bayesianoptimization\"}\n", " parameters = [\n", " {\"name\": \"--epochs\", \"parameterType\": \"int\", \"feasibleSpace\": {\"min\": \"1\",\"max\": \"2\"}},\n", " {\"name\": \"--learning_rate\", \"parameterType\": \"double\", \"feasibleSpace\": {\"min\": \"0.01\", \"max\": \"0.05\"}},\n", " ]\n", " rawTemplate = {\n", " \"apiVersion\": \"kubeflow.org/v1\",\n", " \"kind\": \"TFJob\",\n", " \"metadata\": {\n", " \"name\": \"{{.Trial}}\",\n", " \"namespace\": \"{{.NameSpace}}\"\n", " },\n", " \"spec\": {\n", " \"tfReplicaSpecs\": {\n", " \"Chief\": {\n", " \"replicas\": 1,\n", " \"restartPolicy\": \"OnFailure\",\n", " \"template\": {\n", " \"spec\": {\n", " \"containers\": [\n", " {\n", " \"command\": [\n", " \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n", " ],\n", " \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n", " \"name\": \"tensorflow\"\n", " }\n", " ]\n", " }\n", " }\n", " },\n", " \"Worker\": {\n", " \"replicas\": 1,\n", " \"restartPolicy\": \"OnFailure\",\n", " \"template\": {\n", " \"spec\": {\n", " \"containers\": [\n", " {\n", " \"command\": [\n", " \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n", " ],\n", " \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n", " \"name\": \"tensorflow\"\n", " }\n", " ]\n", " }\n", " }\n", " }\n", " }\n", " }\n", " }\n", " \n", " trialTemplate = {\n", " \"goTemplate\": {\n", " \"rawTemplate\": json.dumps(rawTemplate)\n", " }\n", " }\n", "\n", " metricsCollectorSpec = {\n", " \"collector\": {\n", " \"kind\": \"StdOut\"\n", " }\n", " }\n", "\n", " katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')\n", " op1 = katib_experiment_launcher_op(\n", " experiment_name=name,\n", " experiment_namespace=namespace,\n", " parallel_trial_count=3,\n", " max_trial_count=12,\n", " objective=str(objectiveConfig),\n", " algorithm=str(algorithmConfig),\n", " trial_template=str(trialTemplate),\n", " parameters=str(parameters),\n", " metrics_collector=str(metricsCollectorSpec),\n", " # experiment_timeout_minutes=experimentTimeoutMinutes,\n", " delete_finished_experiment=False)\n", "\n", " # step2: create a TFJob to train your model with best hyperparameter tuned by Katib\n", " tfjobjson_template = Template(\"\"\"\n", "{\n", " \"apiVersion\": \"kubeflow.org/v1\",\n", " \"kind\": \"TFJob\",\n", " \"metadata\": {\n", " \"name\": \"$name\",\n", " \"namespace\": \"$namespace\",\n", " \"annotations\": {\n", " \"sidecar.istio.io/inject\": \"false\"\n", " }\n", " },\n", " \"spec\": {\n", " \"tfReplicaSpecs\": {\n", " \"Chief\": {\n", " \"replicas\": 1,\n", " \"restartPolicy\": \"OnFailure\",\n", " \"template\": {\n", " \"metadata\": {\n", " \"annotations\": {\n", " \"sidecar.istio.io/inject\": \"false\"\n", " }\n", " },\n", " \"spec\": {\n", " \"containers\": [\n", " {\n", " \"command\": [\n", " \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n", " ],\n", " \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n", " \"name\": \"tensorflow\"\n", " } \n", " ]\n", " }\n", " }\n", " },\n", " \"Worker\": {\n", " \"replicas\": 1,\n", " \"restartPolicy\": \"OnFailure\",\n", " \"template\": {\n", " \"metadata\": {\n", " \"annotations\": {\n", " \"sidecar.istio.io/inject\": \"false\"\n", " }\n", " },\n", " \"spec\": {\n", " \"containers\": [\n", " {\n", " \"command\": [\n", " \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n", " ],\n", " \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n", " \"name\": \"tensorflow\"\n", " }\n", " ]\n", " }\n", " }\n", " }\n", " }\n", " }\n", "}\n", "\"\"\")\n", "\n", " op2 = convert_op(op1.output)\n", " tfjobjson = tfjobjson_template.substitute(\n", " {'args': op2.output,\n", " 'name': name,\n", " 'namespace': namespace,\n", " 'step': step,\n", " })\n", "\n", " tfjob = json.loads(tfjobjson)\n", "\n", " train = dsl.ResourceOp(\n", " name=\"train\",\n", " k8s_resource=tfjob,\n", " success_condition='status.replicaStatuses.Worker.succeeded==1,status.replicaStatuses.Chief.succeeded==1'\n", " )\n", " dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Assign permission to Kubeflow Pipeline Service Account\n", "!kubectl create clusterrolebinding $user_namespace-admin --clusterrole cluster-admin --serviceaccount=kubeflow:pipeline-run" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Specify Kubeflow Pipeline Host\n", "host=None\n", "\n", "# Submit a pipeline run\n", "from kfp_tekton import TektonClient\n", "TektonClient(host=host).create_run_from_pipeline_func(text_classification_pipeline, arguments={})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Cleanup your created jobs\n", "!kubectl delete experiment -n $user_namespace $model_name\n", "!kubectl delete tfjob -n $user_namespace $model_name" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.9" } }, "nbformat": 4, "nbformat_minor": 4 }