# Train and deploy on Kubeflow from Notebooks

This notebook introduces you to using Kubeflow Fairing to train and deploy a model to Kubeflow on Google Kubernetes Engine (GKE), and Kubeflow Pipeline to build a simple pipeline and deploy on GKE. This notebook demonstrate how to:
 
* Train an XGBoost model in a local notebook,
* Use Kubeflow Fairing to train an XGBoost model remotely on Kubeflow,
  * For simplicity code-generated synthetic data is used.
  * The append builder is used to rapidly build a docker image.
* Use Kubeflow Fairing to deploy a trained model to Kubeflow, and Call the deployed endpoint for predictions.
* Use a simple pipeline to train a model in GKE. 

To learn more about how to run this notebook locally, see the guide to [training and deploying on GCP from a local notebook][gcp-local-notebook].

[gcp-local-notebook]: https://kubeflow.org/docs/fairing/gcp/tutorials/gcp-local-notebook/

## Set up your notebook for training an XGBoost model

Import the libraries required to train this model.

In [None]:
!pip3 install retrying
!pip3 install fairing
!pip3 install kfmd

In [32]:
import util
from pathlib import Path
import os

util.notebook_setup()


In [33]:
# fairing:include-cell
import fire
import joblib
import logging
import kfmd
import nbconvert
import os
import pathlib
import sys
from pathlib import Path
import pandas as pd
import pprint
from sklearn.metrics import mean_absolute_error
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from xgboost import XGBRegressor
from importlib import reload
from sklearn.datasets import make_regression
from kfmd import metadata
from datetime import datetime


In [34]:
# Imports not to be included in the built docker image
import kfp
import kfp.components as comp
import kfp.gcp as gcp
import kfp.dsl as dsl
import kfp.compiler as compiler
from kubernetes import client as k8s_client
import fairing   
from fairing.builders import append
from fairing.deployers import job
from fairing.preprocessors.converted_notebook import ConvertNotebookPreprocessorWithFire


In [35]:
# fairing:include-cell
def read_synthetic_input(test_size=0.25):
    """generate synthetic data and split it into train and test."""
    # generate regression dataset
    X, y = make_regression(n_samples=200, n_features=5, noise=0.1)
    train_X, test_X, train_y, test_y = train_test_split(X,
                                                      y,
                                                      test_size=test_size,
                                                      shuffle=False)

    imputer = SimpleImputer()
    train_X = imputer.fit_transform(train_X)
    test_X = imputer.transform(test_X)

    return (train_X, train_y), (test_X, test_y)


In [36]:
# fairing:include-cell
def train_model(train_X,
                train_y,
                test_X,
                test_y,
                n_estimators,
                learning_rate):
    """Train the model using XGBRegressor."""
    model = XGBRegressor(n_estimators=n_estimators, learning_rate=learning_rate)

    model.fit(train_X,
            train_y,
            early_stopping_rounds=40,
            eval_set=[(test_X, test_y)])

    print("Best RMSE on eval: %.2f with %d rounds",
               model.best_score,
               model.best_iteration+1)
    return model

def eval_model(model, test_X, test_y):
    """Evaluate the model performance."""
    predictions = model.predict(test_X)
    mae=mean_absolute_error(predictions, test_y)
    logging.info("mean_absolute_error=%.2f", mae)
    return mae

def save_model(model, model_file):
    """Save XGBoost model for serving."""
    joblib.dump(model, model_file)
    logging.info("Model export success: %s", model_file)

Define various constants

## Define Train and Predict functions

In [37]:
# fairing:include-cell
class ModelServe(object):
    
    def __init__(self, model_file=None):
        self.n_estimators = 50
        self.learning_rate = 0.1
        if not model_file:
            if "MODEL_FILE" in os.environ:
                print("model_file not supplied; checking environment variable")
                model_file = os.getenv("MODEL_FILE")
            else:
                print("model_file not supplied; using the default")
                model_file = "mockup-model.dat"
        
        self.model_file = model_file
        print("model_file={0}".format(self.model_file))
        
        self.model = None
        self.exec = self.create_execution()

    def train(self):
        (train_X, train_y), (test_X, test_y) = read_synthetic_input()
        self.exec.log_input(metadata.DataSet(
            description="xgboost synthetic data",
            name="synthetic-data",
            owner="someone@kubeflow.org",
            uri="file://path/to/dataset",
            version="v1.0.0"))
        
        model = train_model(train_X,
                          train_y,
                          test_X,
                          test_y,
                          self.n_estimators,
                          self.learning_rate)

        mae = eval_model(model, test_X, test_y)
        self.exec.log_output(metadata.Metrics(
            name="xgboost-synthetic-traing-eval",
            owner="someone@kubeflow.org",
            description="training evaluation for xgboost synthetic",
            uri="gcs://path/to/metrics",
            metrics_type=metadata.Metrics.VALIDATION,
            values={"mean_absolute_error": mae}))
        
        save_model(model, self.model_file)
        self.exec.log_output(metadata.Model(
            name="housing-price-model",
            description="housing price prediction model using synthetic data",
            owner="someone@kubeflow.org",
            uri=self.model_file,
            model_type="linear_regression",
            training_framework={
                "name": "xgboost",
                "version": "0.9.0"
            },
            hyperparameters={
                "learning_rate": self.learning_rate,
                "n_estimators": self.n_estimators
            },
            version=datetime.utcnow().isoformat("T")))
        
    def predict(self, X, feature_names):
        """Predict using the model for given ndarray."""
        if not self.model:
            self.model = joblib.load(self.model_file)
        # Do any preprocessing
        prediction = self.model.predict(data=X)
        # Do any postprocessing
        return [[prediction.item(0), prediction.item(1)]]
    
    def create_execution(self):
        workspace = metadata.Workspace(
        # Connect to metadata-service in namesapce kubeflow in k8s cluster.
        backend_url_prefix="metadata-service.kubeflow:8080",
        name="xgboost-synthetic",
        description="workspace for xgboost-synthetic artifacts and executions")
        
        r = metadata.Run(
            workspace=workspace,
            name="xgboost-synthetic-faring-run" + datetime.utcnow().isoformat("T"),
            description="a notebook run")

        return metadata.Execution(
            name = "execution" + datetime.utcnow().isoformat("T"),
            workspace=workspace,
            run=r,
            description="execution for training xgboost-synthetic")

## Train your Model Locally

* Train your model locally inside your notebook

In [39]:
ModelServe(model_file="mockup-model.dat").train()

model_file=mockup-model.dat
[0]	validation_0-rmse:145.743
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:137.786
[2]	validation_0-rmse:129.221
[3]	validation_0-rmse:122.795
[4]	validation_0-rmse:117.913
[5]	validation_0-rmse:113.441
[6]	validation_0-rmse:108.843
[7]	validation_0-rmse:104.968
[8]	validation_0-rmse:101.756
[9]	validation_0-rmse:98.9659
[10]	validation_0-rmse:96.2215
[11]	validation_0-rmse:93.6806
[12]	validation_0-rmse:90.5423
[13]	validation_0-rmse:88.1216
[14]	validation_0-rmse:85.4835
[15]	validation_0-rmse:83.1785
[16]	validation_0-rmse:80.9087
[17]	validation_0-rmse:78.916
[18]	validation_0-rmse:77.5187
[19]	validation_0-rmse:75.0274
[20]	validation_0-rmse:74.0297
[21]	validation_0-rmse:72.1579
[22]	validation_0-rmse:70.6119
[23]	validation_0-rmse:69.7389
[24]	validation_0-rmse:67.9469
[25]	validation_0-rmse:66.8921
[26]	validation_0-rmse:66.1554
[27]	validation_0-rmse:64.6994
[28]	validation_0-rmse:63.5188
[29]	validation_0-r

mean_absolute_error=41.16
Model export success: mockup-model.dat


Best RMSE on eval: %.2f with %d rounds 53.609386 50


## Predict locally

* Run prediction inside the notebook using the newly created notebook

In [8]:
(train_X, train_y), (test_X, test_y) =read_synthetic_input()

ModelServe().predict(test_X, None)

model_file not supplied; using the default
model_file=mockup-model.dat


[[68.33491516113281, 68.33491516113281]]

## Use Fairing to Launch a K8s Job to train your model

### Set up Kubeflow Fairing for training and predictions

Import the `fairing` library and configure the environment that your training or prediction job will run in.

In [9]:
# Setting up google container repositories (GCR) for storing output containers
# You can use any docker container registry istead of GCR
GCP_PROJECT = fairing.cloud.gcp.guess_project_name()
print(GCP_PROJECT)
DOCKER_REGISTRY = 'gcr.io/{}/fairing-job'.format(GCP_PROJECT)
print(DOCKER_REGISTRY)
PY_VERSION = ".".join([str(x) for x in sys.version_info[0:3]])
BASE_IMAGE = 'python:{}'.format(PY_VERSION)
# ucan use Dockerfile in this repo to build and use the base_image
base_image = "gcr.io/kubeflow-images-public/xgboost-fairing-example-base:v-20190612"


issue-label-bot-dev
gcr.io/issue-label-bot-dev/fairing-job


## Use fairing to build the docker image

* This uses the append builder to rapidly build docker images

In [10]:
from fairing.builders import cluster
preprocessor = ConvertNotebookPreprocessorWithFire("ModelServe")

if not preprocessor.input_files:
    preprocessor.input_files = set()
input_files=["xgboost_util.py", "mockup-model.dat"]
preprocessor.input_files =  set([os.path.normpath(f) for f in input_files])
preprocessor.preprocess()

[PosixPath('build-train-deploy.py'), 'xgboost_util.py', 'mockup-model.dat']

In [11]:
cluster_builder = cluster.cluster.ClusterBuilder(registry=DOCKER_REGISTRY,
                                                 base_image=base_image,
                                                 namespace='kubeflow',
                                                 preprocessor=preprocessor,
                                                 pod_spec_mutators=[fairing.cloud.gcp.add_gcp_credentials_if_exists],
                                                 context_source=cluster.gcs_context.GCSContextSource())
cluster_builder.build()

Building image using cluster builder.
Creating docker context: /tmp/fairing_context_5d629kor
Waiting for fairing-builder-lz9zx to start...
Pod started running True


[36mINFO[0m[0000] Downloading base image gcr.io/kubeflow-images-public/xgboost-fairing-example-base:v-20190612
[36mINFO[0m[0000] Downloading base image gcr.io/kubeflow-images-public/xgboost-fairing-example-base:v-20190612
[33mWARN[0m[0000] Error while retrieving image from cache: getting image from path: open /cache/sha256:f90e54e312c4cfba28bec6993add2a85b4e127b52149ec0aaf41e5f8889a4086: no such file or directory
[36mINFO[0m[0000] Checking for cached layer gcr.io/issue-label-bot-dev/fairing-job/fairing-job/cache:e46cfa04f5f0d0445ce3ce8b91886d94e96f2875510a69aa9afaeb0ba9e62fc4...
[36mINFO[0m[0000] Using caching version of cmd: RUN if [ -e requirements.txt ];then pip install --no-cache -r requirements.txt; fi
[36mINFO[0m[0000] Using files from context: [/kaniko/buildcontext/app]
[36mINFO[0m[0000] Taking snapshot of full filesystem...
[36mINFO[0m[0000] Skipping paths under /dev, as it is a whitelisted directory
[36mINFO[0m[0000] Skipping paths under /etc/secrets, as it i

In [12]:
builder = append.append.AppendBuilder(registry=DOCKER_REGISTRY,
                                      base_image=cluster_builder.image_tag, preprocessor=preprocessor)
builder.build()


Building image using Append builder...
Creating docker context: /tmp/fairing_context_xpzlon_h
build-train-deploy.py already exists in Fairing context, skipping...
Loading Docker credentials for repository 'gcr.io/issue-label-bot-dev/fairing-job/fairing-job:E480ACAF'
Invoking 'docker-credential-gcloud' to obtain Docker credentials.
Successfully obtained Docker credentials.
Image successfully built in 1.2515304939588532s.
Pushing image gcr.io/issue-label-bot-dev/fairing-job/fairing-job:DA1D5CB0...
Loading Docker credentials for repository 'gcr.io/issue-label-bot-dev/fairing-job/fairing-job:DA1D5CB0'
Invoking 'docker-credential-gcloud' to obtain Docker credentials.
Successfully obtained Docker credentials.
Uploading gcr.io/issue-label-bot-dev/fairing-job/fairing-job:DA1D5CB0
Layer sha256:9d866f8bde2a0d607a6d17edc0fbd5e00b58306efc2b0a57e0ba72f269e7c6be exists, skipping
Layer sha256:124c757242f88002a858c23fc79f8262f9587fa30fd92507e586ad074afb42b6 exists, skipping
Layer sha256:bbf0f5f91e8108

## Launch the K8s Job

* Use pod mutators to attach a PVC and credentials to the pod

In [19]:
pod_spec = builder.generate_pod_spec()
NAMESPACE = "user1"
train_deployer = job.job.Job(namespace=NAMESPACE, 
                             cleanup=False,
                             pod_spec_mutators=[
                             fairing.cloud.gcp.add_gcp_credentials_if_exists])

# Add command line arguments
pod_spec.containers[0].command.extend(["train"])
result = train_deployer.deploy(pod_spec)

INFO:fairing.kubernetes.manager:Pod started running True


model_file not supplied; using the default
model_file=mockup-model.dat
[0]	validation_0-rmse:90.6249
Will train until validation_0-rmse hasn't improved in 40 rounds.
[1]	validation_0-rmse:85.3672
[2]	validation_0-rmse:80.6077
[3]	validation_0-rmse:75.9867
[4]	validation_0-rmse:72.15
[5]	validation_0-rmse:68.4247
[6]	validation_0-rmse:65.4166
[7]	validation_0-rmse:62.7606
[8]	validation_0-rmse:60.1438
[9]	validation_0-rmse:57.9401
[10]	validation_0-rmse:55.8747
[11]	validation_0-rmse:53.957
[12]	validation_0-rmse:52.2249
[13]	validation_0-rmse:50.556
[14]	validation_0-rmse:49.2282
[15]	validation_0-rmse:47.8585
[16]	validation_0-rmse:46.6933
[17]	validation_0-rmse:45.5335
[18]	validation_0-rmse:44.3206
[19]	validation_0-rmse:43.2371
[20]	validation_0-rmse:42.5117
[21]	validation_0-rmse:41.6298
[22]	validation_0-rmse:40.9242
[23]	validation_0-rmse:40.1302
[24]	validation_0-rmse:39.4707
[25]	validation_0-rmse:38.8031
[26]	validation_0-rmse:38.3108
[27]	validation_0-rmse:37.689
[28]	valida

In [20]:
!kubectl get jobs -l fairing-id={train_deployer.job_id} -o yaml

apiVersion: v1
items:
- apiVersion: batch/v1
  kind: Job
  metadata:
    creationTimestamp: "2019-06-12T20:21:53Z"
    generateName: fairing-job-
    labels:
      fairing-deployer: job
      fairing-id: b7955e0a-8d4f-11e9-9207-96ec34699c76
    name: fairing-job-t429t
    namespace: user1
    resourceVersion: "7556018"
    selfLink: /apis/batch/v1/namespaces/user1/jobs/fairing-job-t429t
    uid: b7b87f19-8d4f-11e9-b008-42010a8e01a5
  spec:
    backoffLimit: 0
    completions: 1
    parallelism: 1
    selector:
      matchLabels:
        controller-uid: b7b87f19-8d4f-11e9-b008-42010a8e01a5
    template:
      metadata:
        creationTimestamp: null
        labels:
          controller-uid: b7b87f19-8d4f-11e9-b008-42010a8e01a5
          fairing-deployer: job
          fairing-id: b7955e0a-8d4f-11e9-9207-96ec34699c76
          job-name: fairing-job-t429t
        name: fairing-deployer
      spec:
        containers:
        - command:
          - python

## Deploy the trained model to Kubeflow for predictions

In [21]:
from fairing.deployers import serving
pod_spec = builder.generate_pod_spec()

module_name = os.path.splitext(preprocessor.executable.name)[0]
deployer = serving.serving.Serving(module_name + ".ModelServe",
                                   service_type="ClusterIP",
                                   labels={"app": "mockup"})
    
url = deployer.deploy(pod_spec)

INFO:root:Cluster endpoint: http://fairing-service-jjgxd.user1.svc.cluster.local


In [22]:
!kubectl get deploy -o yaml {deployer.deployment.metadata.name}

apiVersion: extensions/v1beta1
kind: Deployment
metadata:
  annotations:
    deployment.kubernetes.io/revision: "1"
  creationTimestamp: "2019-06-12T20:22:27Z"
  generateName: fairing-deployer-
  generation: 1
  labels:
    app: mockup
    fairing-deployer: serving
    fairing-id: cbc0e610-8d4f-11e9-9207-96ec34699c76
  name: fairing-deployer-cltbb
  namespace: user1
  resourceVersion: "7556174"
  selfLink: /apis/extensions/v1beta1/namespaces/user1/deployments/fairing-deployer-cltbb
  uid: cbc54e8f-8d4f-11e9-b008-42010a8e01a5
spec:
  progressDeadlineSeconds: 600
  replicas: 1
  revisionHistoryLimit: 10
  selector:
    matchLabels:
      app: mockup
      fairing-deployer: serving
      fairing-id: cbc0e610-8d4f-11e9-9207-96ec34699c76
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      creationTimestamp: null
      labels:
        app: mockup
        fairing-deployer: serv

## Call the prediction endpoint

Create a test dataset, then call the endpoint on Kubeflow for predictions.

In [23]:
(train_X, train_y), (test_X, test_y) =read_synthetic_input()


In [24]:
full_url = url + ":5000/predict"
result = util.predict_nparray(full_url, test_X)
pprint.pprint(result.content)

(b'<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 3.2 Final//EN">\n<title>500 Inter'
 b'nal Server Error</title>\n<h1>Internal Server Error</h1>\n<p>The server en'
 b'countered an internal error and was unable to complete your request. Either '
 b'the server is overloaded or there is an error in the application.</p>\n')


## Clean up the prediction endpoint

Delete the prediction endpoint created by this notebook.

In [33]:
# !kubectl delete service -l app=ames
# !kubectl delete deploy -l app=ames

## Build a simple 1 step pipeline

In [25]:
EXPERIMENT_NAME = 'MockupModel'

#### Define the pipeline
Pipeline function has to be decorated with the `@dsl.pipeline` decorator

In [26]:
@dsl.pipeline(
   name='Training pipeline',
   description='A pipeline that trains an xgboost model for the Ames dataset.'
)
def train_pipeline(
   ):      
    command=["python", preprocessor.executable.name, "train"]
    train_op = dsl.ContainerOp(
            name="train", 
            image=builder.image_tag,        
            command=command,
            ).apply(
                gcp.use_gcp_secret('user-gcp-sa'),
            )
    train_op.container.working_dir = "/app"

#### Compile the pipeline

In [27]:
pipeline_func = train_pipeline
pipeline_filename = pipeline_func.__name__ + '.pipeline.zip'
compiler.Compiler().compile(pipeline_func, pipeline_filename)

#### Submit the pipeline for execution

In [28]:
#Specify pipeline argument values
arguments = {}

# Get or create an experiment and submit a pipeline run
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

#vvvvvvvvv This link leads to the run information page. (Note: There is a bug in JupyterLab that modifies the URL and makes the link stop working)

INFO:root:Creating experiment MockupModel.
