In [None]:
import json
import kfp
from kfp import components
from kfp.components import func_to_container_op
import kfp.dsl as dsl

In [None]:
model_name = "text-classification"
user_namespace = "kubeflow-mailsforyashj"

In [None]:
def add_istio_annotation(op):
 op.add_pod_annotation(name='sidecar.istio.io/inject', value='false')
 return op

In [None]:
@dsl.pipeline(
 name="End to end pipeline",
 description="An end to end example including hyperparameter tuning"
)
def text_classification_pipeline(name=model_name, namespace=user_namespace, step=4000):
 # step 1: create a Katib experiment to tune hyperparameters
 objectiveConfig = {
 "type": "maximize",
 "goal": 0.6,
 "objectiveMetricName": "val_accuracy",
 }
 algorithmConfig = {"algorithmName" : "bayesianoptimization"}
 parameters = [
 {"name": "--epochs", "parameterType": "int", "feasibleSpace": {"min": "1","max": "2"}},
 {"name": "--learning_rate", "parameterType": "double", "feasibleSpace": {"min": "0.01", "max": "0.05"}},
 ]
 rawTemplate = {
 "apiVersion": "kubeflow.org/v1",
 "kind": "TFJob",
 "metadata": {
 "name": "{{.Trial}}",
 "namespace": "{{.NameSpace}}"
 },
 "spec": {
 "tfReplicaSpecs": {
 "Chief": {
 "replicas": 1,
 "restartPolicy": "OnFailure",
 "template": {
 "spec": {
 "containers": [
 {
 "command": [
 "python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
 ],
 "image": "gcr.io/gsoc-kf-example/tf_2_text_classification:1.4",
 "name": "tensorflow"
 }
 ]
 }
 }
 },
 "Worker": {
 "replicas": 1,
 "restartPolicy": "OnFailure",
 "template": {
 "spec": {
 "containers": [
 {
 "command": [
 "python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
 ],
 "image": "gcr.io/gsoc-kf-example/tf_2_text_classification:1.4",
 "name": "tensorflow"
 }
 ]
 }
 }
 }
 }
 }
 }
 
 trialTemplate = {
 "goTemplate": {
 "rawTemplate": json.dumps(rawTemplate)
 }
 }

 metricsCollectorSpec = {
 "collector": {
 "kind": "StdOut"
 }
 }

 katib_experiment_launcher_op = components.load_component_from_url('https://raw.githubusercontent.com/kubeflow/pipelines/master/components/kubeflow/katib-launcher/component.yaml')
 op1 = katib_experiment_launcher_op(
 experiment_name=name,
 experiment_namespace=namespace,
 parallel_trial_count=3,
 max_trial_count=12,
 objective=str(objectiveConfig),
 algorithm=str(algorithmConfig),
 trial_template=str(trialTemplate),
 parameters=str(parameters),
 metrics_collector=str(metricsCollectorSpec),
 # experiment_timeout_minutes=experimentTimeoutMinutes,
 delete_finished_experiment=False)

 # step2: create a TFJob to train your model with best hyperparameter tuned by Katib
 tfjobjson_template = Template("""
{
 "apiVersion": "kubeflow.org/v1",
 "kind": "TFJob",
 "metadata": {
 "name": "$name",
 "namespace": "$namespace",
 "annotations": {
 "sidecar.istio.io/inject": "false"
 }
 },
 "spec": {
 "tfReplicaSpecs": {
 "Chief": {
 "replicas": 1,
 "restartPolicy": "OnFailure",
 "template": {
 "metadata": {
 "annotations": {
 "sidecar.istio.io/inject": "false"
 }
 },
 "spec": {
 "containers": [
 {
 "command": [
 "python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
 ],
 "image": "gcr.io/gsoc-kf-example/tf_2_text_classification:1.4",
 "name": "tensorflow"
 } 
 ]
 }
 }
 },
 "Worker": {
 "replicas": 1,
 "restartPolicy": "OnFailure",
 "template": {
 "metadata": {
 "annotations": {
 "sidecar.istio.io/inject": "false"
 }
 },
 "spec": {
 "containers": [
 {
 "command": [
 "python3 /app/text_classification_rnn.py {{- with .HyperParameters}} {{- range .}} {{.Name}}={{.Value}} {{- end}} {{- end}}"
 ],
 "image": "gcr.io/gsoc-kf-example/tf_2_text_classification:1.4",
 "name": "tensorflow"
 }
 ]
 }
 }
 }
 }
 }
}
""")

 op2 = convert_op(op1.output)
 tfjobjson = tfjobjson_template.substitute(
 {'args': op2.output,
 'name': name,
 'namespace': namespace,
 'step': step,
 })

 tfjob = json.loads(tfjobjson)

 train = dsl.ResourceOp(
 name="train",
 k8s_resource=tfjob,
 success_condition='status.replicaStatuses.Worker.succeeded==1,status.replicaStatuses.Chief.succeeded==1'
 )
 dsl.get_pipeline_conf().add_op_transformer(add_istio_annotation)

In [None]:
#Assign permission to Kubeflow Pipeline Service Account
!kubectl create clusterrolebinding $user_namespace-admin --clusterrole cluster-admin --serviceaccount=kubeflow:pipeline-run

In [None]:
# Specify Kubeflow Pipeline Host
host=None

# Submit a pipeline run
from kfp_tekton import TektonClient
TektonClient(host=host).create_run_from_pipeline_func(text_classification_pipeline, arguments={})

In [None]:
#Cleanup your created jobs
!kubectl delete experiment -n $user_namespace $model_name
!kubectl delete tfjob -n $user_namespace $model_name