In [1]:
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

# Bert Pipeline : PyTorch BERT News Classfication

This notebook shows PyTorch BERT end-to-end news classification example using Kubeflow Pipelines.


An example notebook that demonstrates how to:

* Get different tasks needed for the pipeline
* Create a Kubeflow pipeline
* Include Pytorch KFP components to preprocess, train, visualize and deploy the model in the pipeline
* Submit a job for execution
* Query(prediction and explain) the final deployed model
* Interpretation of the model using the Captum Insights


In [44]:
! pip uninstall -y kfp
! pip install --no-cache-dir kfp captum

In [3]:
import kfp
import json
import os
from kfp.onprem import use_k8s_secret
from kfp import components
from kfp.components import load_component_from_file, load_component_from_url
from kfp import dsl
from kfp import compiler

kfp.__version__

'1.8.12'

# Enter your gateway and the cookie
[Use this extension on chrome to get token]( https://chrome.google.com/webstore/detail/editthiscookie/fngmhnnpilhplaeedifhccceomclgfbg?hl=en)

![image.png](./image.png)

## Update values for the ingress gateway and auth session

In [4]:
INGRESS_GATEWAY='http://istio-ingressgateway.istio-system.svc.cluster.local'
AUTH="<enter your token here>"
NAMESPACE="kubeflow-user-example-com"
COOKIE="authservice_session="+AUTH
EXPERIMENT="Default"

## Set Log bucket and Tensorboard Image

In [5]:
MINIO_ENDPOINT="http://minio-service.kubeflow:9000"
LOG_BUCKET="mlpipeline"
TENSORBOARD_IMAGE="public.ecr.aws/pytorch-samples/tboard:latest"

In [6]:
client = kfp.Client(host=INGRESS_GATEWAY+"/pipeline", cookies=COOKIE)

In [7]:
client.create_experiment(EXPERIMENT)
experiments = client.list_experiments(namespace=NAMESPACE)
my_experiment = experiments.experiments[0]
my_experiment

{'created_at': datetime.datetime(2022, 4, 21, 9, 45, 22, tzinfo=tzlocal()),
 'description': None,
 'id': 'b4bee8c3-381b-42a0-9494-bc81eb9aa359',
 'name': 'Default',
 'resource_references': [{'key': {'id': 'kubeflow-user-example-com',
                                  'type': 'NAMESPACE'},
                          'name': None,
                          'relationship': 'OWNER'}],
 'storage_state': 'STORAGESTATE_AVAILABLE'}

## Set Inference parameters

In [8]:
DEPLOY_NAME="bertserve"
MODEL_NAME="bert"

In [14]:
! python utils/generate_templates.py bert/template_mapping.json

Processing prediction_component.yaml
Processing ax_complete_trials_component.yaml
Processing preprocess_component.yaml
Processing train_component.yaml
Processing tensorboard_component.yaml
Processing ax_generate_trials_component.yaml
Processing minio_component.yaml
Processing copy_component.yaml
Processing ax_train_component.yaml


In [15]:
prepare_tensorboard_op = load_component_from_file("yaml/tensorboard_component.yaml")
prep_op = components.load_component_from_file(
    "yaml/preprocess_component.yaml"
)
train_op = components.load_component_from_file(
    "yaml/train_component.yaml"
)
deploy_op = load_component_from_file(
    "../../../components/kserve/component.yaml"
)
minio_op = components.load_component_from_file(
    "yaml/minio_component.yaml"
)

## Define pipeline

In [19]:
@dsl.pipeline(name="Training pipeline", description="Sample training job test")
def pytorch_bert( # pylint: disable=too-many-arguments
    minio_endpoint=MINIO_ENDPOINT,
    log_bucket=LOG_BUCKET,
    log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
    mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
    config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
    model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
    tf_image=TENSORBOARD_IMAGE,
    deploy=DEPLOY_NAME,
    namespace=NAMESPACE,
    confusion_matrix_log_dir=f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
    num_samples=1000,
    max_epochs=1
):
    """Thid method defines the pipeline tasks and operations"""
    prepare_tb_task = prepare_tensorboard_op(
        log_dir_uri=f"s3://{log_bucket}/{log_dir}",
        image=tf_image,
        pod_template_spec=json.dumps({
            "spec": {
                "containers": [{
                    "env": [
                        {
                            "name": "AWS_ACCESS_KEY_ID",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "accesskey",
                                }
                            },
                        },
                        {
                            "name": "AWS_SECRET_ACCESS_KEY",
                            "valueFrom": {
                                "secretKeyRef": {
                                    "name": "mlpipeline-minio-artifact",
                                    "key": "secretkey",
                                }
                            },
                        },
                        {
                            "name": "AWS_REGION",
                            "value": "minio"
                        },
                        {
                            "name": "S3_ENDPOINT",
                            "value": f"{minio_endpoint}",
                        },
                        {
                            "name": "S3_USE_HTTPS",
                            "value": "0"
                        },
                        {
                            "name": "S3_VERIFY_SSL",
                            "value": "0"
                        },
                    ]
                }]
            }
        }),
    ).set_display_name("Visualization")

    prep_task = (
        prep_op().after(prepare_tb_task
                       ).set_display_name("Preprocess & Transform")
    )
    confusion_matrix_url = f"minio://{log_bucket}/{confusion_matrix_log_dir}"
    script_args = f"model_name=bert.pth," \
                  f"num_samples={num_samples}," \
                  f"confusion_matrix_url={confusion_matrix_url}"
    # For GPU , set device count and strategy type
    ptl_args = f"max_epochs={max_epochs},accelerator=gpu,profiler=pytorch,devices=0,strategy=None"
    train_task = (
        train_op(
            input_data=prep_task.outputs["output_data"],
            script_args=script_args,
            ptl_arguments=ptl_args
        ).after(prep_task).set_display_name("Training")
        # For allocating resources, uncomment below lines
        # .set_memory_request('600M')
        # .set_memory_limit('1200M')
        # .set_cpu_request('700m')
        # .set_cpu_limit('1400m')
        # For GPU uncomment below line and set GPU limit and node selector
        # .set_gpu_limit(1).add_node_selector_constraint('cloud.google.com/gke-accelerator','nvidia-tesla-p4')
    )

    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=log_dir,
            input_path=train_task.outputs["tensorboard_root"],
            filename="",
        ).after(train_task).set_display_name("Tensorboard Events Pusher")
    )
    minio_mar_upload = (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=mar_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="bert_test.mar",
        ).after(train_task).set_display_name("Mar Pusher")
    )
    (
        minio_op(
            bucket_name="mlpipeline",
            folder_name=config_prop_path,
            input_path=train_task.outputs["checkpoint_dir"],
            filename="config.properties",
        ).after(train_task).set_display_name("Conifg Pusher")
    )

    model_uri = str(model_uri)
    # pylint: disable=unused-variable
    isvc_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          storageUri: {}
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
    """.format(deploy, namespace, model_uri)

    # For GPU inference use below yaml with gpu count and accelerator
    gpu_count = "1"
    accelerator = "nvidia-tesla-p4"
    isvc_gpu_yaml = """
    apiVersion: "serving.kserve.io/v1beta1"
    kind: "InferenceService"
    metadata:
      name: {}
      namespace: {}
    spec:
      predictor:
        serviceAccountName: sa
        pytorch:
          protocolVersion: v2
          storageUri: {}
          resources:
            requests: 
              cpu: 4
              memory: 8Gi
            limits:
              cpu: 4
              memory: 8Gi
              nvidia.com/gpu: {}
          nodeSelector:
            cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
    # Update inferenceservice_yaml for GPU inference
    deploy_task = (
        deploy_op(action="apply", inferenceservice_yaml=isvc_yaml
                 ).after(minio_mar_upload).set_display_name("Deployer")
    )

    dsl.get_pipeline_conf().add_op_transformer(
        use_k8s_secret(
            secret_name="mlpipeline-minio-artifact",
            k8s_secret_key_to_env={
                "secretkey": "MINIO_SECRET_KEY",
                "accesskey": "MINIO_ACCESS_KEY",
            },
        )
    )

In [20]:
# Compile pipeline
compiler.Compiler().compile(pytorch_bert, 'pytorch.tar.gz', type_check=True)

In [21]:
# Execute pipeline
run = client.run_pipeline(my_experiment.id, 'pytorch-bert', 'pytorch.tar.gz')

## Wait for inference service below to go to `READY True` state.

In [22]:
!kubectl get isvc $DEPLOY

NAME        URL                                                      READY   PREV   LATEST   PREVROLLEDOUTREVISION   LATESTREADYREVISION                 AGE
bertserve   http://bertserve.kubeflow-user-example-com.example.com   True           100                              bertserve-predictor-default-00003   160m


# Get Inferenceservice name

In [23]:
INFERENCE_SERVICE_LIST = ! kubectl get isvc {DEPLOY_NAME} -n {NAMESPACE} -o json | python3 -c "import sys, json; print(json.load(sys.stdin)['status']['url'])"| tr -d '"' | cut -d "/" -f 3
INFERENCE_SERVICE_NAME = INFERENCE_SERVICE_LIST[0]
INFERENCE_SERVICE_NAME

'bertserve.kubeflow-user-example-com.example.com'

# Prediction Request

In [24]:
! cat ./bert/sample.txt

{
  "id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298",
  "inputs": [{
    "name": "4b7c7d4a-51e4-43c8-af61-04639f6ef4bc",
    "shape": -1,
    "datatype": "BYTES",
    "data": "Bloomberg has reported on the economy"
  }
  ]
}

In [45]:
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/infer" -d @./bert/sample.txt > bert_prediction_output.json

In [26]:
! cat bert_prediction_output.json

{"id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298", "model_name": "bert_test", "model_version": "1", "outputs": [{"name": "predict", "shape": [], "datatype": "BYTES", "data": ["\"Business\""]}]}

# Explanation Request

In [41]:
!curl -v -H "Host: $INFERENCE_SERVICE_NAME" -H "Cookie: $COOKIE" "$INGRESS_GATEWAY/v2/models/$MODEL_NAME/explain" -d @./bert/sample.txt  > bert_explaination_output.json

In [31]:
! cat bert_explaination_output.json

{"id": "d3b15cad-50a2-4eaf-80ce-8b0a428bd298", "model_name": "bert_test", "model_version": "1", "outputs": [{"name": "explain", "shape": [], "datatype": "BYTES", "data": [{"words": ["bloomberg", "has", "reported", "on", "the", "economy"], "importances": [0.2124089759942075, 0.3070123112652129, -0.3175794877732026, -0.4493290921520886, -0.23262562691072097, 0.7097589881393321], "delta": 0.01156902069987975}]}]}

In [43]:
explanations_json = json.loads(open("./bert_explaination_output.json", "r").read())
explanations_json

In [33]:
prediction_json = json.loads(open("./bert_prediction_output.json", "r").read())

In [38]:
import torch
attributions = explanations_json["outputs"][0]["data"][0]['importances']
tokens = explanations_json["outputs"][0]["data"][0]['words']
delta = explanations_json["outputs"][0]["data"][0]['delta']

attributions = torch.tensor(attributions)
pred_prob = 0.75
pred_class = str(prediction_json["outputs"][0]["data"][0]).strip('""')
true_class = "Business"
attr_class ="world"

# Visualization of Predictions

In [39]:
from captum.attr import visualization
vis_data_records =[]
vis_data_records.append(visualization.VisualizationDataRecord(
                            attributions,
                            pred_prob,
                            pred_class,
                            true_class,
                            attr_class,
                            attributions.sum(),       
                            tokens,
                            delta))

In [40]:
vis = visualization.visualize_text(vis_data_records)

True Label,Predicted Label,Attribution Label,Attribution Score,Word Importance
Business,Business (0.75),world,0.23,bloomberg has reported on the economy
,,,,


### visualization appreas as below
![viz1.png](./viz1.png)

## Cleanup Script

In [10]:
! kubectl delete --all isvc -n $NAMESPACE

In [11]:
! kubectl delete pod --field-selector=status.phase==Succeeded -n $NAMESPACE