# KubeFlow Pipeline: Github Issue Summarization using Tensor2Tensor

This notebook assumes that you have already set up a GKE cluster with CAIP Pipelines (Hosted KFP) installed, with the addition of a GPU-enabled node pool, as per this codelab: [g.co/codelabs/kubecon18](g.co/codelabs/kubecon18).

In this notebook, we will show how to:

* Interactively define a KubeFlow Pipeline using the Pipelines Python SDK
* Submit and run the pipeline
* Add a step in the pipeline

This example pipeline trains a [Tensor2Tensor](https://github.com/tensorflow/tensor2tensor/) model on Github issue data, learning to predict issue titles from issue bodies. It then exports the trained model and deploys the exported model to [Tensorflow Serving](https://github.com/tensorflow/serving). 
The final step in the pipeline launches a web app which interacts with the TF-Serving instance in order to get model predictions.

## Setup

Do some installations and imports, and set some variables.  Set the `WORKING_DIR` to a path under the Cloud Storage bucket you created earlier.  You may need to restart your kernel after the KFP SDK update.

In [None]:
!pip install -U kfp

In [None]:
# Restart kernel after the pip install
import IPython

IPython.Application.instance().kernel.do_shutdown(True)

In [None]:
import kfp  # the Pipelines SDK.  
from kfp import compiler
import kfp.dsl as dsl
import kfp.gcp as gcp
import kfp.components as comp
from kfp.dsl.types import Integer, GCSPath, String

import kfp.notebook

In [None]:
# Define some pipeline input variables. 
WORKING_DIR = 'gs://YOUR_GCS_BUCKET/t2t/notebooks' # Such as gs://bucket/object/path

PROJECT_NAME = 'YOUR_PROJECT'
GITHUB_TOKEN = 'YOUR_GITHUB_TOKEN'  # optional; used for prediction, to grab issue data from GH

DEPLOY_WEBAPP = 'false'  # change this to 'true' to deploy a new version of the webapp part of the pipeline

## Instantiate the KFP client and create an *Experiment* in the Kubeflow Pipeline System

Next we'll instantiate a KFP client object with the `host` info from your Hosted KFP installation.  To do this, go to the Pipelines dashboard in the Cloud Console and click on the "Settings" gear for the KFP installation that you want to use. You'll see a popup window. Look for the "Connect to this Kubeflow Pipelines instance..." text and copy the "client = kfp.Client(...)" line below it. Edit the following cell to use that line.

In [None]:
# CHANGE THIS with the info for your KFP cluster installation
client = kfp.Client(host='xxxxxxxx-dot-us-centralx.pipelines.googleusercontent.com')

The Kubeflow Pipeline system requires an "Experiment" to group pipeline runs. You can create a new experiment, or call `client.list_experiments()` to get existing ones. (This will also serve to check that your client is set up properly).

In [None]:
client.list_experiments()

In [None]:
exp = client.create_experiment(name='t2t_notebook')

## Define a Pipeline

Authoring a pipeline is like authoring a normal Python function. The pipeline function describes the topology of the pipeline. The pipeline components (steps) are container-based. For this pipeline, we're using a mix of predefined components loaded from their [component definition files](https://www.kubeflow.org/docs/pipelines/sdk/component-development/), and some components defined via [the `dsl.ContainerOp` constructor](https://www.kubeflow.org/docs/pipelines/sdk/build-component/).  For this codelab, we've prebuilt all the components' containers.

While not shown here, there are other ways to build Kubeflow Pipeline components as well, including converting stand-alone python functions to containers via [`kfp.components.func_to_container_op(func)`](https://www.kubeflow.org/docs/pipelines/sdk/lightweight-python-components/).  You can read more [here](https://www.kubeflow.org/docs/pipelines/sdk/).


This pipeline has several steps:

- An existing model checkpoint is copied to your bucket.
- Dataset metadata is logged to the Kubeflow metadata server.
- A [Tensor2Tensor](https://github.com/tensorflow/tensor2tensor/) model is trained using preprocessed data. (Training starts from the existing model checkpoint copied in the first step, then trains for a few more hundred steps-- it would take too long to fully train it now). When it finishes, it exports the model in a form suitable for serving by [TensorFlow serving](https://github.com/tensorflow/serving/).
- Training metadata is logged to the metadata server.
- The next step in the pipeline deploys a TensorFlow-serving instance using that model.
- The last step launches a web app for interacting with the served model to retrieve predictions.

We'll first define some constants and load some components from their definition files.

In [None]:
COPY_ACTION = 'copy_data'
TRAIN_ACTION = 'train'
DATASET = 'dataset'
MODEL = 'model'

copydata_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/datacopy_component.yaml'  # pylint: disable=line-too-long
  )

train_op = comp.load_component_from_url(
  'https://raw.githubusercontent.com/kubeflow/examples/master/github_issue_summarization/pipelines/components/t2t/train_component.yaml' # pylint: disable=line-too-long
  )


Next, we'll define the pipeline itself.

In [None]:
@dsl.pipeline(
  name='Github issue summarization',
  description='Demonstrate Tensor2Tensor-based training and TF-Serving'
)
def gh_summ(
  train_steps: 'Integer' = 2019300,
  project: str = 'YOUR_PROJECT_HERE',
  github_token: str = 'YOUR_GITHUB_TOKEN_HERE',
  working_dir: 'GCSPath' = 'gs://YOUR_GCS_DIR_HERE',
  checkpoint_dir: 'GCSPath' = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/',
  deploy_webapp: str = 'true',
  data_dir: 'GCSPath' = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'
  ):

  copydata = copydata_op(
    data_dir=data_dir,
    checkpoint_dir=checkpoint_dir,
    model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    action=COPY_ACTION,
    )

  train = train_op(
    data_dir=data_dir,
    model_dir=copydata.outputs['copy_output_path'],
    action=TRAIN_ACTION, train_steps=train_steps,
    deploy_webapp=deploy_webapp
    )

  serve = dsl.ContainerOp(
      name='serve',
      image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve:v5',
      arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
          "--model_path", train.outputs['train_output_path']
          ]
      )

  train.set_gpu_limit(1)

  with dsl.Condition(train.outputs['launch_server'] == 'true'):
    webapp = dsl.ContainerOp(
        name='webapp',
        image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v7ap',
        arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
            "--github_token", github_token]

        )
    webapp.after(serve)

## Submit an experiment *run*

In [None]:
compiler.Compiler().compile(gh_summ, 'ghsumm.tar.gz')

The call below will run the compiled pipeline.  We won't actually do that now, but instead we'll add a new step to the pipeline, then run it.

In [None]:
# You'd uncomment this call to actually run the pipeline. 
# run = client.run_pipeline(exp.id, 'ghsumm', 'ghsumm.tar.gz',
#                           params={'working_dir': WORKING_DIR,
#                                   'github_token': GITHUB_TOKEN,
#                                   'project': PROJECT_NAME})

## Add a step to the pipeline

Next, let's add a new step to the pipeline above.  As currently defined, the pipeline accesses a directory of pre-processed data as input to training.  Let's see how we could include the pre-processing as part of the pipeline. 

We're going to cheat a bit, as processing the full dataset will take too long for this workshop, so we'll use a smaller sample. For that reason, you won't actually make use of the generated data from this step (we'll stick to using the full dataset for training), but this shows how you could do so if we had more time.

First, we'll define the new pipeline step. Note the last line of this new function, which gives this step's pod the credentials to access GCP.

In [None]:
# defining the new data preprocessing pipeline step. 
# Note the last line, which gives this step's pod the credentials to access GCP
def preproc_op(data_dir, project):
  return dsl.ContainerOp(
    name='datagen',
    image='gcr.io/google-samples/ml-pipeline-t2tproc',
    arguments=[ "--data-dir", data_dir, "--project", project]
  )

### Modify the pipeline to add the new step

Now, we'll redefine the pipeline to add the new step. We're reusing the component ops defined above.

In [None]:
# Then define a new Pipeline. It's almost the same as the original one, 
# but with the addition of the data processing step.

@dsl.pipeline(
  name='Github issue summarization',
  description='Demonstrate Tensor2Tensor-based training and TF-Serving'
)
def gh_summ2(
  train_steps: 'Integer' = 2019300,
  project: str = 'YOUR_PROJECT_HERE',
  github_token: str = 'YOUR_GITHUB_TOKEN_HERE',
  working_dir: 'GCSPath' = 'YOUR_GCS_DIR_HERE',
  checkpoint_dir: 'GCSPath' = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000/',
  deploy_webapp: str = 'true',
  data_dir: 'GCSPath' = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'
  ):

  # The new pre-processing op.
  preproc = preproc_op(project=project,
      data_dir=('%s/%s/gh_data' % (working_dir, dsl.RUN_ID_PLACEHOLDER)))

  copydata = copydata_op(
    data_dir=data_dir,
    checkpoint_dir=checkpoint_dir,
    model_dir='%s/%s/model_output' % (working_dir, dsl.RUN_ID_PLACEHOLDER),
    action=COPY_ACTION,
    )

  train = train_op(
    data_dir=data_dir,
    model_dir=copydata.outputs['copy_output_path'],
    action=TRAIN_ACTION, train_steps=train_steps,
    deploy_webapp=deploy_webapp
    )
  train.after(preproc)    

  serve = dsl.ContainerOp(
      name='serve',
      image='gcr.io/google-samples/ml-pipeline-kubeflow-tfserve:v5',
      arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
          "--model_path", train.outputs['train_output_path']
          ]
      )

  train.set_gpu_limit(1)

  with dsl.Condition(train.outputs['launch_server'] == 'true'):
    webapp = dsl.ContainerOp(
        name='webapp',
        image='gcr.io/google-samples/ml-pipeline-webapp-launcher:v7ap',
        arguments=["--model_name", 'ghsumm-%s' % (dsl.RUN_ID_PLACEHOLDER,),
            "--github_token", github_token]

        )
    webapp.after(serve)

### Compile the new pipeline definition and submit the run

In [None]:
compiler.Compiler().compile(gh_summ2, 'ghsumm2.tar.gz')

In [None]:
run = client.run_pipeline(exp.id, 'ghsumm2', 'ghsumm2.tar.gz',
                          params={'working_dir': WORKING_DIR,
                                  'github_token': GITHUB_TOKEN,
                                  'deploy_webapp': DEPLOY_WEBAPP,
                                  'project': PROJECT_NAME})

You should be able to see your newly defined pipeline run in the dashboard:
![](https://storage.googleapis.com/amy-jo/images/kf-pls/t2t_pipeline_in_dashboard.png)

The new pipeline has the following structure:

![The new pipeline structure.](https://storage.googleapis.com/amy-jo/images/kf-pls/t2t_pipeline_structure.png)

Below is a screenshot of the pipeline running.

![The pipeline running.](https://storage.googleapis.com/amy-jo/images/kf-pls/t2t_pipeline_running.png)

When this new pipeline finishes running, you'll be able to see your generated processed data files in GCS under the path: `WORKING_DIR/<pipeline_name>/gh_data`. There isn't time in the workshop to pre-process the full dataset, but if there had been, we could have defined our pipeline to read from that generated directory for its training input.

-----------------------------
Copyright 2018 Google LLC

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.