In [25]:
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Bert Pipeline : PyTorch BERT News Classfication

This notebook shows PyTorch BERT end-to-end news classification example using Kubeflow Pipelines.


An example notebook that demonstrates how to:

* Get different tasks needed for the pipeline
* Create a Kubeflow pipeline
* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline
* Submit a job for execution
* Query(prediction and explain) the final deployed model
* Interpretation of the model using the Captum Insights


In [13]:
! pip uninstall -y kfp
! pip install --no-cache-dir kfp torch captum

In [72]:
import kfp
import json
import os
from kfp.onprem import use_k8s_secret
from kfp import components
from kfp.components import load_component_from_file, load_component_from_url, InputPath
from kfp import dsl
from kfp import compiler

kfp.__version__

'1.6.4'

# Enter your gateway and the cookie
[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)

![image.png](./image.png)

## Update values for the ingress gateway and auth session

In [92]:
INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'
AUTH="<enter your token here>"
NAMESPACE="kubeflow-user-example-com"
COOKIE="authservice_session="+AUTH
EXPERIMENT="Default"
dist_volume = 'dist-vol'
volume_mount_path ="/model"
dataset_path = volume_mount_path+"/dataset"
checkpoint_dir = volume_mount_path+"/checkpoint"
tensorboard_root = volume_mount_path+"/tensorboard"

## Set Log bucket and Tensorboard Image

In [93]:
MINIO_ENDPOINT="http://minio-service.kubeflow:9000"
LOG_BUCKET="mlpipeline"
TENSORBOARD_IMAGE="public.ecr.aws/pytorch-samples/tboard:latest"

In [94]:
client = kfp.Client(host=INGRESS_GATEWAY+"/pipeline", cookies=COOKIE)

In [95]:
client.create_experiment(EXPERIMENT)
experiments = client.list_experiments(namespace=NAMESPACE)
my_experiment = experiments.experiments[0]
my_experiment

{'created_at': datetime.datetime(2021, 6, 21, 13, 13, 6, tzinfo=tzlocal()),
 'description': None,
 'id': 'ba9b7266-2b1c-4729-afcd-be808c25c5af',
 'name': 'Default',
 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

In [96]:
DEPLOY_NAME="bert-dist"
MODEL_NAME="bert"

In [97]:
! python utils/generate_templates.py bert/template_mapping.json

In [98]:
prepare_tensorboard_op = load_component_from_file(
    "yaml/tensorboard_component.yaml"
)
prep_op = components.load_component_from_file(
    "yaml/preprocess_component.yaml"
)
# Use GPU image in train component
train_op = components.load_component_from_file(
    "yaml/train_component.yaml"
)
deploy_op = load_component_from_file("../../../components/kserve/component.yaml")
minio_op = components.load_component_from_file(
    "yaml/minio_component.yaml"
)
pytorch_job_op = load_component_from_file("../../../components/kubeflow/pytorch-launcher/component.yaml")
kubernetes_create_pvc_op = load_component_from_file(
    "../../../components/contrib/kubernetes/Create_PersistentVolumeClaim/component.yaml"
)
cp_op = load_component_from_file(
    "yaml/copy_component.yaml"
)

In [99]:
from kubernetes.client.models import V1Volume, V1PersistentVolumeClaimVolumeSource
def create_dist_pipeline():
    kubernetes_create_pvc_op(name=dist_volume, storage_size= "2Gi", namespace=NAMESPACE)

create_volume_run = client.create_run_from_pipeline_func(create_dist_pipeline, arguments={})
create_volume_run.wait_for_run_completion()

## Define pipeline

In [100]:
@dsl.pipeline(name="Training pipeline", description="Sample training job test")
def pytorch_bert(
    minio_endpoint=MINIO_ENDPOINT,
    log_bucket=LOG_BUCKET,
    log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
    confusion_matrix_log_dir=f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
    mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store/",
    config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config/",
    model_uri=f"pvc://{dist_volume}/mar/{dsl.RUN_ID_PLACEHOLDER}",
    tf_image=TENSORBOARD_IMAGE,
    deploy=DEPLOY_NAME,
    namespace=NAMESPACE,
    num_samples=1000,
    max_epochs=1,
    gpus=2,
    num_nodes=2
):
    
    prepare_tb_task = prepare_tensorboard_op(
        log_dir_uri=f"s3://{log_bucket}/{log_dir}",
        image=tf_image,
        pod_template_spec=json.dumps({
            "spec": {
                "containers": [{
                    "env": [
                        {
                            "name": "AWS_ACCESS_KEY_ID",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "accesskey",
                                }
                            },
                        },
                        {
                            "name": "AWS_SECRET_ACCESS_KEY",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "secretkey",
                                }
                            },
                        },
                        {
                            "name": "AWS_REGION",
                            "value": "minio"
                        },
                        {
                            "name": "S3_ENDPOINT",
                            "value": f"{minio_endpoint}",
                        },
                        {
                            "name": "S3_USE_HTTPS",
                            "value": "0"
                        },
                        {
                            "name": "S3_VERIFY_SSL",
                            "value": "0"
                        },
                    ]
                }]
            }
        }),
    ).set_display_name("Visualization")

    prep_task = prep_op().after(prepare_tb_task).set_display_name("Preprocess & Transform")
    copy_task = cp_op("true", prep_task.outputs['output_data'], dataset_path,"").add_pvolumes({volume_mount_path: dsl.PipelineVolume(pvc=dist_volume)}).after(prep_task).set_display_name("Copy Dataset")
    confusion_matrix_url = f"minio://{log_bucket}/{confusion_matrix_log_dir}"
    train_task = pytorch_job_op(
        name="pytorch-bert-dist", 
        namespace=namespace, 
        master_spec=
        {
          "replicas": 1,
          "imagePullPolicy": "Always",
          "restartPolicy": "OnFailure",
          "template": {
            "metadata": {
              "annotations": {
                "sidecar.istio.io/inject": "false"
              }
            },
            "spec": {
              "containers": [
                {
                  "name": "pytorch",
                  "image": "public.ecr.aws/pytorch-samples/kfp_samples:latest-gpu",
                  "command": ["python", "bert/agnews_classification_pytorch.py"],
                  "args": [
                    "--dataset_path", dataset_path,
                    "--checkpoint_dir", checkpoint_dir,
                    "--script_args", f"model_name=bert.pth,num_samples={num_samples}",
                    "--tensorboard_root", tensorboard_root,
                    "--ptl_args", f"max_epochs={max_epochs},profiler=pytorch,devices={gpus},accelerator=gpu,strategy=ddp,num_nodes={num_nodes},confusion_matrix_url={confusion_matrix_url}"
                  ],
                  "env": [
                    {
                        "name": "MINIO_ACCESS_KEY",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "accesskey",
                            }
                        },
                    },
                    {
                        "name": "MINIO_SECRET_KEY",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "secretkey",
                            }
                        },
                    }
                  ],
                  "ports": [
                    {
                      "containerPort": 24456,
                      "name": "pytorchjob-port"
                    }
                  ],
                  "resources": {
                    "limits": {
                      "nvidia.com/gpu": 2
                    }
                  },
                  "volumeMounts": [
                    {
                      "mountPath": volume_mount_path,
                      "name": "model-volume"
                    }
                  ]
                }
              ],
              "volumes": [
                {
                  "name": "model-volume",
                  "persistentVolumeClaim": {
                    "claimName": dist_volume
                  }
                }
              ]
            }
          }
        }, 
        worker_spec=
        {
          "replicas": 1,
          "imagePullPolicy": "Always",
          "restartPolicy": "OnFailure",
          "template": {
            "metadata": {
              "annotations": {
                "sidecar.istio.io/inject": "false"
              }
            },
            "spec": {
              "containers": [
                {
                  "name": "pytorch",
                  "image": "public.ecr.aws/pytorch-samples/kfp_samples:latest-gpu",
                  "command": ["python", "bert/agnews_classification_pytorch.py"],
                  "args": [
                    "--dataset_path", dataset_path,
                    "--checkpoint_dir", checkpoint_dir,
                    "--script_args", f"model_name=bert.pth,num_samples={num_samples}",
                    "--tensorboard_root", tensorboard_root,
                    "--ptl_args", f"max_epochs={max_epochs},profiler=pytorch,devices={gpus},strategy=ddp,accelerator=gpu,num_nodes={num_nodes},confusion_matrix_url={confusion_matrix_url}"
                  ],
                  "env": [
                    {
                        "name": "MINIO_ACCESS_KEY",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "accesskey",
                            }
                        },
                    },
                    {
                        "name": "MINIO_SECRET_KEY",
                        "valueFrom": {
                            "secretKeyRef": {
                                "name": "mlpipeline-minio-artifact",
                                "key": "secretkey",
                            }
                        },
                    }
                  ],
                  "ports": [
                    {
                      "containerPort": 24456,
                      "name": "pytorchjob-port"
                    }
                  ],
                  "resources": {
                    "limits": {
                      "nvidia.com/gpu": 2
                    }
                  },
                  "volumeMounts": [
                    {
                      "mountPath": volume_mount_path,
                      "name": "model-volume"
                    }
                  ]
                }
              ],
              "volumes": [
                {
                  "name": "model-volume",
                  "persistentVolumeClaim": {
                    "claimName": dist_volume
                  }
                }
              ]
            }
          }
        },
        delete_after_done=False
    ).after(copy_task)
    
    mar_folder_restructure_task = dsl.ContainerOp(
            name='mar restructure',
            image='library/bash:4.4.23',
            command=['sh', '-c'],
            arguments=[f'mkdir -p {volume_mount_path}/{mar_path}; mkdir -p {volume_mount_path}/{config_prop_path}; cp {checkpoint_dir}/*.mar {volume_mount_path}/{mar_path}; cp {checkpoint_dir}/config.properties {volume_mount_path}/{config_prop_path}']).add_pvolumes({volume_mount_path: dsl.PipelineVolume(pvc=dist_volume)}).after(train_task).set_display_name("Restructure MAR and config.properties path")
    mar_folder_restructure_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    copy_tensorboard = cp_op("false", "", "", tensorboard_root).add_pvolumes({volume_mount_path: dsl.PipelineVolume(pvc=dist_volume)}).after(mar_folder_restructure_task).set_display_name("Copy Tensorboard Logs")
    copy_tensorboard.execution_options.caching_strategy.max_cache_staleness = "P0D"

    minio_tb_upload = (
        minio_op(
            bucket_name=log_bucket,
            folder_name=log_dir,
            input_path=copy_tensorboard.outputs["destination_path"],
            filename="",
        ).after(copy_tensorboard)
        .set_display_name("Tensorboard Events Pusher")
    )
    
    # Deploy inferenceservice in gpu
    gpu_count = "1"
    isvc_gpu_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          storageUri: {}
          protocolVersion: v2
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
              nvidia.com/gpu: {}
    """.format(
        deploy, namespace, model_uri, gpu_count
    )
    
    deploy_task = (
        deploy_op(action="apply", inferenceservice_yaml=isvc_gpu_yaml)
        .after(minio_tb_upload)
        .set_display_name("Deployer")
    )
    deploy_task.execution_options.caching_strategy.max_cache_staleness = "P0D"
    
    dsl.get_pipeline_conf().add_op_transformer(
        use_k8s_secret(
            secret_name="mlpipeline-minio-artifact",
            k8s_secret_key_to_env={
                "secretkey": "MINIO_SECRET_KEY",
                "accesskey": "MINIO_ACCESS_KEY",
            },
        )
    )

In [101]:
# Compile pipeline
compiler.Compiler().compile(pytorch_bert, 'pytorch.tar.gz', type_check=True)

In [102]:
# Execute pipeline
run = client.run_pipeline(my_experiment.id, 'pytorch-bert', 'pytorch.tar.gz')

## Wait for inference service below to go to `READY True` state.

In [103]:
!kubectl get isvc $DEPLOY

NAME        URL                                                      READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                 AGE
bert-dist   http://bert-dist.kubeflow-user-example-com.example.com   True           100                              bert-dist-predictor-default-00001   4m12s


# Get Inferenceservice name

In [104]:
INFERENCE_SERVICE_LIST = ! kubectl get isvc {DEPLOY_NAME} -n {NAMESPACE} -o json | python3 -c "import sys, json; print(json.load(sys.stdin)['status']['url'])"| tr -d '"' | cut -d "/" -f 3
INFERENCE_SERVICE_NAME = INFERENCE_SERVICE_LIST[0]
INFERENCE_SERVICE_NAME

'bert-dist.kubeflow-user-example-com.example.com'

# Prediction Request

In [105]:
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/infer" -d @./bert/sample.txt > bert_prediction_output.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:-- --:--:-- --:--:--     0*   Trying 10.100.176.44:80...
* TCP_NODELAY set
* Connected to istio-ingressgateway.istio-system.svc.cluster.local (10.100.176.44) port 80 (#0)
> POST /v1/models/bert:predict HTTP/1.1
> Host: bert-dist.kubeflow-user-example-com.example.com
> User-Agent: curl/7.68.0
> Accept: */*
> Cookie: authservice_session=MTY1MTQyNjA3MnxOd3dBTkVoTVZUSk1URmRaTmxkQ04xQk1WelpSTWpKYU1rMU5UVTFJTlZGWFNVYzNUMHRUV0ZWRVNFRlJNMGxJTTFOUE5FeFJRVUU9fIcMBBMyWExQz5ZZSXeVDwn4jPm3MrRX0hExC_vYeREr
> Content-Length: 84
> Content-Type: application/x-www-form-urlencoded
> 
} [84 bytes data]
* upload completely sent off: 84 out of 84 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 33
< content-type: application/json; charset=UTF-8
< date: Mon, 02 May 202

In [106]:
! cat bert_prediction_output.json

{"predictions": ["\"Sci/Tech\""]}

# Explanation Request

In [107]:
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/explain" -d @./bert/sample.txt  > bert_explaination_output.json

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
  0     0    0     0    0     0      0      0 --:--:--  0:00:04 --:--:--     0*   Trying 10.100.176.44:80...
* TCP_NODELAY set
* Connected to istio-ingressgateway.istio-system.svc.cluster.local (10.100.176.44) port 80 (#0)
> POST /v1/models/bert:explain HTTP/1.1
> Host: bert-dist.kubeflow-user-example-com.example.com
> User-Agent: curl/7.68.0
> Accept: */*
> Cookie: authservice_session=MTY1MTQyNjA3MnxOd3dBTkVoTVZUSk1URmRaTmxkQ04xQk1WelpSTWpKYU1rMU5UVTFJTlZGWFNVYzNUMHRUV0ZWRVNFRlJNMGxJTTFOUE5FeFJRVUU9fIcMBBMyWExQz5ZZSXeVDwn4jPm3MrRX0hExC_vYeREr
> Content-Length: 84
> Content-Type: application/x-www-form-urlencoded
> 
} [84 bytes data]
* upload completely sent off: 84 out of 84 bytes
* Mark bundle as not supporting multiuse
< HTTP/1.1 200 OK
< content-length: 264
< content-type: application/json; charset=UTF-8
< date: Mon, 02 May 20

In [108]:
! cat bert_explaination_output.json

{"explanations": [{"words": ["bloomberg", "has", "reported", "on", "the", "economy"], "importances": [-0.49426081646662806, 0.09581777446473196, -0.09546984597236165, -0.19612933767921537, -0.2438196769639178, 0.7996849104110348], "delta": -0.005089809745116192}]}

In [16]:
explanations_json = json.loads(open("./bert_explaination_output.json", "r").read())
explanations_json

{'explanations': [{'words': ['[CLS]',
    'bloomberg',
    'has',
    'reported',
    'on',
    'the',
    'economy',
    '[SEP]'],
   'importances': [0.18556156547587432,
    -0.04754466449824699,
    -0.09005958599003015,
    0.056995451538874545,
    0.10996221573727777,
    0.148971232294231,
    0.398128678194734,
    -0.8712959534101352],
   'delta': 0.008833148050828438}]}

In [17]:
prediction_json = json.loads(open("./bert_prediction_output.json", "r").read())

In [23]:
import torch
attributions = explanations_json["outputs"][0]["data"][0]['importances']
tokens = explanations_json["outputs"][0]["data"][0]['words']
delta = explanations_json["outputs"][0]["data"][0]['delta']

attributions = torch.tensor(attributions)
pred_prob = 0.75
pred_class = str(prediction_json["outputs"][0]["data"][0]).strip('""')
true_class = "Business"
attr_class ="world"

# Visualization of Predictions

In [24]:
from captum.attr import visualization
vis_data_records =[]
vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred_prob,
                            pred_class,
                            true_class,
                            attr_class,
                            attributions.sum(),       
                            tokens,
                            delta))

In [25]:
vis = visualization.visualize_text(vis_data_records)

True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
Business,"""Sci/Tech"" (0.75)",world,-0.11,[CLS] bloomberg has reported on the economy [SEP]
,,,,


### visualization appreas as below
![viz1.png](./viz1.png)

## Cleanup Script

In [85]:
! kubectl delete --all isvc -n $NAMESPACE

inferenceservice.serving.kserve.io "bert-dist" deleted


In [84]:
! kubectl delete pod --field-selector=status.phase==Succeeded -n $NAMESPACE

pod "create-dist-pipeline-444nk-3959473792" deleted
pod "training-pipeline-trb5h-1876153621" deleted
pod "training-pipeline-trb5h-284914308" deleted
pod "training-pipeline-trb5h-3177383612" deleted
pod "training-pipeline-trb5h-3252145113" deleted
pod "training-pipeline-trb5h-3265872190" deleted
pod "training-pipeline-trb5h-3331631297" deleted
pod "training-pipeline-trb5h-3651310105" deleted
pod "training-pipeline-trb5h-3914481085" deleted
