In [None]:
# Copyright 2019 Google Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

# Reusable components

This tutorial describes the manual way of writing a full component program (in any language) and a component definition for it. Below is a summary of the steps involved in creating and using a component:

- Write the program that contains your component’s logic. The program must use files and command-line arguments to pass data to and from the component.
- Containerize the program.
- Write a component specification in YAML format that describes the component for the Kubeflow Pipelines system.
- Use the Kubeflow Pipelines SDK to load your component, use it in a pipeline and run that pipeline.

Note: Ensure that you have Docker installed, if you want to build the image locally, by running the following command:
 
`which docker`
 
The result should be something like:

`/usr/bin/docker`

In [None]:
import kfp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
import kfp.components as comp
import datetime

import kubernetes as k8s

In [None]:
# Required Parameters
PROJECT_ID='<ADD GCP PROJECT HERE>'
GCS_BUCKET='gs://<ADD STORAGE LOCATION HERE>'

## Create client

If you run this notebook **outside** of a Kubeflow cluster, run the following command:
- `host`: The URL of your Kubeflow Pipelines instance, for example "https://`<your-deployment>`.endpoints.`<your-project>`.cloud.goog/pipeline"
- `client_id`: The client ID used by Identity-Aware Proxy
- `other_client_id`: The client ID used to obtain the auth codes and refresh tokens.
- `other_client_secret`: The client secret used to obtain the auth codes and refresh tokens.

```python
client = kfp.Client(host, client_id, other_client_id, other_client_secret)
```

If you run this notebook **within** a Kubeflow cluster, run the following command:
```python
client = kfp.Client()
```

You'll need to create OAuth client ID credentials of type `Other` to get `other_client_id` and `other_client_secret`. Learn more about [creating OAuth credentials](
https://cloud.google.com/iap/docs/authentication-howto#authenticating_from_a_desktop_app)

In [None]:
# Optional Parameters, but required for running outside Kubeflow cluster
HOST = '<ADD HOST NAME TO TALK TO KUBEFLOW PIPELINE HERE>'
CLIENT_ID = '<ADD OAuth CLIENT ID USED BY IAP HERE>'
OTHER_CLIENT_ID = '<ADD OAuth CLIENT ID USED TO OBTAIN AUTH CODES HERE>'
OTHER_CLIENT_SECRET = '<ADD OAuth CLIENT SECRET USED TO OBTAIN AUTH CODES HERE>'

In [None]:
# Create kfp client
in_cluster = True
try:
  k8s.config.load_incluster_config()
except:
  in_cluster = False
  pass

if in_cluster:
    client = kfp.Client()
else:
    client = kfp.Client(host=HOST, 
                        client_id=CLIENT_ID,
                        other_client_id=OTHER_CLIENT_ID, 
                        other_client_secret=OTHER_CLIENT_SECRET)

## Writing the program code

The following cell creates a file `app.py` that contains a Python script. The script downloads MNIST dataset, trains a Neural Network based classification model, writes the training log and exports the trained model to Google Cloud Storage.

Your component can create outputs that the downstream components can use as inputs. Each output must be a string and the container image must write each output to a separate local text file. For example, if a training component needs to output the path of the trained model, the component writes the path into a local file, such as `/output.txt`.

In [None]:
%%bash

# Create folders if they don't exist.
mkdir -p tmp/reuse_components/mnist_training

# Create the Python file that lists GCS blobs.
cat > ./tmp/reuse_components/mnist_training/app.py <<HERE
import argparse
from datetime import datetime
import tensorflow as tf

parser = argparse.ArgumentParser()
parser.add_argument(
    '--model_file', type=str, required=True, help='Name of the model file.')
parser.add_argument(
    '--bucket', type=str, required=True, help='GCS bucket name.')
args = parser.parse_args()

bucket=args.bucket
model_file=args.model_file

model = tf.keras.models.Sequential([
  tf.keras.layers.Flatten(input_shape=(28, 28)),
  tf.keras.layers.Dense(512, activation=tf.nn.relu),
  tf.keras.layers.Dropout(0.2),
  tf.keras.layers.Dense(10, activation=tf.nn.softmax)
])

model.compile(optimizer='adam',
              loss='sparse_categorical_crossentropy',
              metrics=['accuracy'])

print(model.summary())    

mnist = tf.keras.datasets.mnist
(x_train, y_train),(x_test, y_test) = mnist.load_data()
x_train, x_test = x_train / 255.0, x_test / 255.0

callbacks = [
  tf.keras.callbacks.TensorBoard(log_dir=bucket + '/logs/' + datetime.now().date().__str__()),
  # Interrupt training if val_loss stops improving for over 2 epochs
  tf.keras.callbacks.EarlyStopping(patience=2, monitor='val_loss'),
]

model.fit(x_train, y_train, batch_size=32, epochs=5, callbacks=callbacks,
          validation_data=(x_test, y_test))


model.save(model_file)

from tensorflow import gfile

gcs_path = bucket + "/" + model_file

if gfile.Exists(gcs_path):
    gfile.Remove(gcs_path)

gfile.Copy(model_file, gcs_path)
with open('/output.txt', 'w') as f:
  f.write(gcs_path)
HERE

## Create a Docker container
Create your own container image that includes your program. 

### Creating a Dockerfile

Now create a container that runs the script. Start by creating a Dockerfile. A Dockerfile contains the instructions to assemble a Docker image. The `FROM` statement specifies the Base Image from which you are building. `WORKDIR` sets the working directory. When you assemble the Docker image, `COPY` copies the required files and directories (for example, `app.py`) to the file system of the container. `RUN` executes a command (for example, install the dependencies) and commits the results. 

In [None]:
%%bash

# Create Dockerfile.
cat > ./tmp/reuse_components/mnist_training/Dockerfile <<EOF
FROM tensorflow/tensorflow:1.15.0-py3
WORKDIR /app
COPY . /app
EOF

### Build docker image

Now that we have created our Dockerfile for creating our Docker image. Then we need to push the image to a registry to host the image. 
- We are going to use the `kfp.containers.build_image_from_working_dir` to build the image and push to the Container Registry (GCR), which uses [kaniko](https://cloud.google.com/blog/products/gcp/introducing-kaniko-build-container-images-in-kubernetes-and-google-container-builder-even-without-root-access).
- It is possible to build the image locally using Docker and then to push it to GCR.

**Note**:
If you run this notebook **within Kubeflow cluster**, **with Kubeflow version >= 0.7**, you need to ensure that valid credentials are created within your notebook's namespace.
- With Kubeflow version >= 0.7, the credential is supposed to be copied automatically while creating notebook through `Configurations`, which doesn't work properly at the time of creating this notebook. 
- You can also add credentials to the new namespace by either [copying credentials from an existing Kubeflow namespace, or by creating a new service account](https://www.kubeflow.org/docs/gke/authentication/#kubeflow-v0-6-and-before-gcp-service-account-key-as-secret).
- The following cell demonstrates how to copy the default secret to your own namespace.

```bash
%%bash

NAMESPACE=<your notebook name space>
SOURCE=kubeflow
NAME=user-gcp-sa
SECRET=$(kubectl get secrets \${NAME} -n \${SOURCE} -o jsonpath="{.data.\${NAME}\.json}" | base64 -D)
kubectl create -n \${NAMESPACE} secret generic \${NAME} --from-literal="\${NAME}.json=\${SECRET}"
```

In [None]:
IMAGE_NAME="mnist_training_kf_pipeline"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"

GCR_IMAGE="gcr.io/{PROJECT_ID}/{IMAGE_NAME}:{TAG}".format(
    PROJECT_ID=PROJECT_ID,
    IMAGE_NAME=IMAGE_NAME,
    TAG=TAG
)

builder = kfp.containers._container_builder.ContainerBuilder(
    gcs_staging=GCS_BUCKET + "/kfp_container_build_staging")

image_name = kfp.containers.build_image_from_working_dir(
    image_name=GCR_IMAGE,
    working_dir='./tmp/reuse_components/mnist_training/',
    builder=builder
)

image_name

#### If you want to use docker to build the image
Run the following in a cell
```bash
%%bash -s "{PROJECT_ID}"

IMAGE_NAME="mnist_training_kf_pipeline"
TAG="latest" # "v_$(date +%Y%m%d_%H%M%S)"

# Create script to build docker image and push it.
cat > ./tmp/reuse_components/mnist_training/build_image.sh <<HERE
PROJECT_ID="${1}"
IMAGE_NAME="${IMAGE_NAME}"
TAG="${TAG}"
GCR_IMAGE="gcr.io/\${PROJECT_ID}/\${IMAGE_NAME}:\${TAG}"
docker build -t \${IMAGE_NAME} .
docker tag \${IMAGE_NAME} \${GCR_IMAGE}
docker push \${GCR_IMAGE}
docker image rm \${IMAGE_NAME}
docker image rm \${GCR_IMAGE}
HERE

cd tmp/reuse_components/mnist_training
bash build_image.sh
```

**Remember to set the image_name after the image is built**
```python
image_name = <the image uri>
```

## Writing your component definition file
To create a component from your containerized program, you must write a component specification in YAML that describes the component for the Kubeflow Pipelines system.

For the complete definition of a Kubeflow Pipelines component, see the [component specification](https://www.kubeflow.org/docs/pipelines/reference/component-spec/). However, for this tutorial you don’t need to know the full schema of the component specification. The notebook provides enough information to complete the tutorial.

Start writing the component definition (component.yaml) by specifying your container image in the component’s implementation section:

In [None]:
%%bash -s "{image_name}"

GCR_IMAGE="${1}"
echo ${GCR_IMAGE}

# Create Yaml
# the image uri should be changed according to the above docker image push output

cat > mnist_component.yaml <<HERE
name: Mnist training
description: Train a mnist model and save to GCS
inputs:
  - name: model_file
    description: 'Name of the model file.'
    type: String
  - name: bucket
    description: 'GCS bucket name.'
    type: String
outputs:
  - name: model_path
    description: 'Trained model path.'
    type: GCSPath
implementation:
  container:
    image: ${GCR_IMAGE}
    command: [
      python, /app/app.py,
      --model_file, {inputValue: model_file},
      --bucket,     {inputValue: bucket},
    ]
    fileOutputs:
      model_path: /output.txt
HERE

### Create your workflow as a Python function

Define your pipeline as a Python function. ` @kfp.dsl.pipeline` is a required decoration, and must include `name` and `description` properties. Then compile the pipeline function. After the compilation is completed, a pipeline file is created.

In [None]:
import os
mnist_train_op = kfp.components.load_component_from_file(os.path.join('./', 'mnist_component.yaml')) 

In [None]:
mnist_train_op.component_spec

In [None]:
# Define the pipeline
@dsl.pipeline(
   name='Mnist pipeline',
   description='A toy pipeline that performs mnist model training.'
)
def mnist_reuse_component_pipeline(
    model_file: str = 'mnist_model.h5', 
    bucket: str = GCS_BUCKET
):
    mnist_train_op(model_file=model_file, bucket=bucket).apply(gcp.use_gcp_secret('user-gcp-sa'))
    return True

### Submit a pipeline run

In [None]:
pipeline_func = mnist_reuse_component_pipeline

In [None]:
experiment_name = 'minist_kubeflow'

arguments = {"model_file":"mnist_model.h5",
             "bucket":GCS_BUCKET}

run_name = pipeline_func.__name__ + ' run'

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments)

**As an alternative, you can compile the pipeline into a package.** The compiled pipeline can be easily shared and reused by others to run the pipeline.

```python
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

experiment = client.create_experiment('python-functions-mnist')

run_result = client.run_pipeline(
    experiment_id=experiment.id, 
    job_name=run_name, 
    pipeline_package_path=pipeline_filename, 
    params=arguments)
```