examples/tensorflow_cuj/text_classification/tekton-pipeline-with-python...

271 lines
9.1 KiB
Plaintext

{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import kfp\n",
"from kfp import components\n",
"from kfp.components import func_to_container_op\n",
"import kfp.dsl as dsl"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"model_name = \"text-classification\"\n",
"user_namespace = \"kubeflow-mailsforyashj\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def add_istio_annotation(op):\n",
" op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')\n",
" return op"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"@dsl.pipeline(\n",
" name=\"End to end pipeline\",\n",
" description=\"An end to end example including hyperparameter tuning\"\n",
")\n",
"def text_classification_pipeline(name=model_name, namespace=user_namespace, step=4000):\n",
" # step 1: create a Katib experiment to tune hyperparameters\n",
" objectiveConfig = {\n",
" \"type\": \"maximize\",\n",
" \"goal\": 0.6,\n",
" \"objectiveMetricName\": \"val_accuracy\",\n",
" }\n",
" algorithmConfig = {\"algorithmName\" : \"bayesianoptimization\"}\n",
" parameters = [\n",
" {\"name\": \"--epochs\", \"parameterType\": \"int\", \"feasibleSpace\": {\"min\": \"1\",\"max\": \"2\"}},\n",
" {\"name\": \"--learning_rate\", \"parameterType\": \"double\", \"feasibleSpace\": {\"min\": \"0.01\", \"max\": \"0.05\"}},\n",
" ]\n",
" rawTemplate = {\n",
" \"apiVersion\": \"kubeflow.org/v1\",\n",
" \"kind\": \"TFJob\",\n",
" \"metadata\": {\n",
" \"name\": \"{{.Trial}}\",\n",
" \"namespace\": \"{{.NameSpace}}\"\n",
" },\n",
" \"spec\": {\n",
" \"tfReplicaSpecs\": {\n",
" \"Chief\": {\n",
" \"replicas\": 1,\n",
" \"restartPolicy\": \"OnFailure\",\n",
" \"template\": {\n",
" \"spec\": {\n",
" \"containers\": [\n",
" {\n",
" \"command\": [\n",
" \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n",
" ],\n",
" \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n",
" \"name\": \"tensorflow\"\n",
" }\n",
" ]\n",
" }\n",
" }\n",
" },\n",
" \"Worker\": {\n",
" \"replicas\": 1,\n",
" \"restartPolicy\": \"OnFailure\",\n",
" \"template\": {\n",
" \"spec\": {\n",
" \"containers\": [\n",
" {\n",
" \"command\": [\n",
" \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n",
" ],\n",
" \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n",
" \"name\": \"tensorflow\"\n",
" }\n",
" ]\n",
" }\n",
" }\n",
" }\n",
" }\n",
" }\n",
" }\n",
" \n",
" trialTemplate = {\n",
" \"goTemplate\": {\n",
" \"rawTemplate\": json.dumps(rawTemplate)\n",
" }\n",
" }\n",
"\n",
" metricsCollectorSpec = {\n",
" \"collector\": {\n",
" \"kind\": \"StdOut\"\n",
" }\n",
" }\n",
"\n",
" katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')\n",
" op1 = katib_experiment_launcher_op(\n",
" experiment_name=name,\n",
" experiment_namespace=namespace,\n",
" parallel_trial_count=3,\n",
" max_trial_count=12,\n",
" objective=str(objectiveConfig),\n",
" algorithm=str(algorithmConfig),\n",
" trial_template=str(trialTemplate),\n",
" parameters=str(parameters),\n",
" metrics_collector=str(metricsCollectorSpec),\n",
" # experiment_timeout_minutes=experimentTimeoutMinutes,\n",
" delete_finished_experiment=False)\n",
"\n",
" # step2: create a TFJob to train your model with best hyperparameter tuned by Katib\n",
" tfjobjson_template = Template(\"\"\"\n",
"{\n",
" \"apiVersion\": \"kubeflow.org/v1\",\n",
" \"kind\": \"TFJob\",\n",
" \"metadata\": {\n",
" \"name\": \"$name\",\n",
" \"namespace\": \"$namespace\",\n",
" \"annotations\": {\n",
" \"sidecar.istio.io/inject\": \"false\"\n",
" }\n",
" },\n",
" \"spec\": {\n",
" \"tfReplicaSpecs\": {\n",
" \"Chief\": {\n",
" \"replicas\": 1,\n",
" \"restartPolicy\": \"OnFailure\",\n",
" \"template\": {\n",
" \"metadata\": {\n",
" \"annotations\": {\n",
" \"sidecar.istio.io/inject\": \"false\"\n",
" }\n",
" },\n",
" \"spec\": {\n",
" \"containers\": [\n",
" {\n",
" \"command\": [\n",
" \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n",
" ],\n",
" \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n",
" \"name\": \"tensorflow\"\n",
" } \n",
" ]\n",
" }\n",
" }\n",
" },\n",
" \"Worker\": {\n",
" \"replicas\": 1,\n",
" \"restartPolicy\": \"OnFailure\",\n",
" \"template\": {\n",
" \"metadata\": {\n",
" \"annotations\": {\n",
" \"sidecar.istio.io/inject\": \"false\"\n",
" }\n",
" },\n",
" \"spec\": {\n",
" \"containers\": [\n",
" {\n",
" \"command\": [\n",
" \"python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}\"\n",
" ],\n",
" \"image\": \"gcr.io/gsoc-kf-example/tf_2_text_classification:1.4\",\n",
" \"name\": \"tensorflow\"\n",
" }\n",
" ]\n",
" }\n",
" }\n",
" }\n",
" }\n",
" }\n",
"}\n",
"\"\"\")\n",
"\n",
" op2 = convert_op(op1.output)\n",
" tfjobjson = tfjobjson_template.substitute(\n",
" {'args': op2.output,\n",
" 'name': name,\n",
" 'namespace': namespace,\n",
" 'step': step,\n",
" })\n",
"\n",
" tfjob = json.loads(tfjobjson)\n",
"\n",
" train = dsl.ResourceOp(\n",
" name=\"train\",\n",
" k8s_resource=tfjob,\n",
" success_condition='status.replicaStatuses.Worker.succeeded==1,status.replicaStatuses.Chief.succeeded==1'\n",
" )\n",
" dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Assign permission to Kubeflow Pipeline Service Account\n",
"!kubectl create clusterrolebinding $user_namespace-admin --clusterrole cluster-admin --serviceaccount=kubeflow:pipeline-run"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Specify Kubeflow Pipeline Host\n",
"host=None\n",
"\n",
"# Submit a pipeline run\n",
"from kfp_tekton import TektonClient\n",
"TektonClient(host=host).create_run_from_pipeline_func(text_classification_pipeline, arguments={})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"#Cleanup your created jobs\n",
"!kubectl delete experiment -n $user_namespace $model_name\n",
"!kubectl delete tfjob -n $user_namespace $model_name"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "Python 3",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.9"
}
},
"nbformat": 4,
"nbformat_minor": 4
}