# Digit Recognizer Kubeflow Pipeline

In this [Kaggle competition](https://www.kaggle.com/competitions/digit-recognizer/overview) 

>MNIST ("Modified National Institute of Standards and Technology") is the de facto “hello world” dataset of computer vision. Since its release in 1999, this classic dataset of handwritten images has served as the basis for benchmarking classification algorithms. As new machine learning techniques emerge, MNIST remains a reliable resource for researchers and learners alike.

>In this competition, your goal is to correctly identify digits from a dataset of tens of thousands of handwritten images.

# Install relevant libraries


>Update pip `pip install --user --upgrade pip`

>Install and upgrade kubeflow sdk `pip install kfp --upgrade --user --quiet`

You may need to restart your notebook kernel after installing the kfp sdk

In [14]:
!pip install --user --upgrade pip



In [15]:
!pip install kfp --upgrade --user --quiet

In [16]:
# confirm the kfp sdk
! pip show kfp

Name: kfp
Version: 1.8.11
Summary: KubeFlow Pipelines SDK
Home-page: https://github.com/kubeflow/pipelines
Author: The Kubeflow Authors
Author-email: 
License: UNKNOWN
Location: /home/jovyan/.local/lib/python3.6/site-packages
Requires: absl-py, click, cloudpickle, dataclasses, Deprecated, docstring-parser, fire, google-api-python-client, google-auth, google-cloud-storage, jsonschema, kfp-pipeline-spec, kfp-server-api, kubernetes, protobuf, pydantic, PyYAML, requests-toolbelt, strip-hints, tabulate, typer, typing-extensions, uritemplate
Required-by: kubeflow-kale


## Import kubeflow pipeline libraries

In [17]:
import kfp
import kfp.components as comp
import kfp.dsl as dsl
from kfp.components import InputPath, OutputPath
from typing import NamedTuple


## Kubeflow pipeline component creation

Component 1: Download the digits Dataset

In [18]:
# download data step
def download_data(download_link: str, data_path: OutputPath(str)):
    import zipfile
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, "-m", "pip", "install", "wget"])
    import wget
    import os

    if not os.path.exists(data_path):
        os.makedirs(data_path)

    # download files
    wget.download(download_link.format(file='train'), f'{data_path}/train_csv.zip')
    wget.download(download_link.format(file='test'), f'{data_path}/test_csv.zip')
    
    with zipfile.ZipFile(f"{data_path}/train_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)
        
    with zipfile.ZipFile(f"{data_path}/test_csv.zip","r") as zip_ref:
        zip_ref.extractall(data_path)
    
    return(print('Done!'))

Component 2: load the digits Dataset

In [19]:
# load data

def load_data(data_path: InputPath(str), 
              load_data_path: OutputPath(str)):
    
    # import Library
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    # import Library
    import os, pickle;
    import pandas as pd
    import numpy as np

    #importing the data
    # Data Path
    train_data_path = data_path + '/train.csv'
    test_data_path = data_path + '/test.csv'

    # Loading dataset into pandas 
    train_df = pd.read_csv(train_data_path)
    test_df = pd.read_csv(test_data_path)
    
    # join train and test together
    ntrain = train_df.shape[0]
    ntest = test_df.shape[0]
    all_data = pd.concat((train_df, test_df)).reset_index(drop=True)
    print("all_data size is : {}".format(all_data.shape))
    
    #creating the preprocess directory
    os.makedirs(load_data_path, exist_ok = True)
    
    #Save the combined_data as a pickle file to be used by the preprocess component.
    with open(f'{load_data_path}/all_data', 'wb') as f:
        pickle.dump((ntrain, all_data), f)
    
    return(print('Done!'))

Component 3: Preprocess the digits Dataset

In [20]:
# preprocess data

def preprocess_data(load_data_path: InputPath(str), 
                    preprocess_data_path: OutputPath(str)):
    
    # import Library
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    import os, pickle;
    import pandas as pd
    import numpy as np
    from sklearn.model_selection import train_test_split

    #loading the train data
    with open(f'{load_data_path}/all_data', 'rb') as f:
        ntrain, all_data = pickle.load(f)
    
    # split features and label
    all_data_X = all_data.drop('label', axis=1)
    all_data_y = all_data.label
    
    # Reshape image in 3 dimensions (height = 28px, width = 28px , channel = 1)
    all_data_X = all_data_X.values.reshape(-1,28,28,1)

    # Normalize the data
    all_data_X = all_data_X / 255.0
    
    #Get the new dataset
    X = all_data_X[:ntrain].copy()
    y = all_data_y[:ntrain].copy()
    
    # split into train and test
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1, random_state=42)
    
    #creating the preprocess directory
    os.makedirs(preprocess_data_path, exist_ok = True)
    
    #Save the train_data as a pickle file to be used by the modelling component.
    with open(f'{preprocess_data_path}/train', 'wb') as f:
        pickle.dump((X_train,  y_train), f)
        
    #Save the test_data as a pickle file to be used by the predict component.
    with open(f'{preprocess_data_path}/test', 'wb') as f:
        pickle.dump((X_test,  y_test), f)
    
    return(print('Done!'))

Component 4: ML modeling

In [21]:
def modeling(preprocess_data_path: InputPath(str), 
            model_path: OutputPath(str)):
    
    # import Library
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','tensorflow'])
    import os, pickle;
    import numpy as np
    import tensorflow as tf
    from tensorflow import keras, optimizers
    from tensorflow.keras.metrics import SparseCategoricalAccuracy
    from tensorflow.keras.losses import SparseCategoricalCrossentropy
    from tensorflow.keras import layers

    #loading the train data
    with open(f'{preprocess_data_path}/train', 'rb') as f:
        train_data = pickle.load(f)
        
    # Separate the X_train from y_train.
    X_train, y_train = train_data
    
    #initializing the classifier model with its input, hidden and output layers
    hidden_dim1=56
    hidden_dim2=100
    DROPOUT=0.5
    model = tf.keras.Sequential([
            tf.keras.layers.Conv2D(filters = hidden_dim1, kernel_size = (5,5),padding = 'Same', 
                         activation ='relu'),
            tf.keras.layers.Dropout(DROPOUT),
            tf.keras.layers.Conv2D(filters = hidden_dim2, kernel_size = (3,3),padding = 'Same', 
                         activation ='relu'),
            tf.keras.layers.Dropout(DROPOUT),
            tf.keras.layers.Conv2D(filters = hidden_dim2, kernel_size = (3,3),padding = 'Same', 
                         activation ='relu'),
            tf.keras.layers.Dropout(DROPOUT),
            tf.keras.layers.Flatten(),
            tf.keras.layers.Dense(10, activation = "softmax")
            ])

    model.build(input_shape=(None,28,28,1))
    
    #Compiling the classifier model with Adam optimizer
    model.compile(optimizers.Adam(learning_rate=0.001), 
              loss=SparseCategoricalCrossentropy(), 
              metrics=SparseCategoricalAccuracy(name='accuracy'))

    # model fitting
    history = model.fit(np.array(X_train), np.array(y_train),
              validation_split=.1, epochs=1, batch_size=64)
    
    #loading the X_test and y_test
    with open(f'{preprocess_data_path}/test', 'rb') as f:
        test_data = pickle.load(f)
    # Separate the X_test from y_test.
    X_test, y_test = test_data
    
    # Evaluate the model and print the results
    test_loss, test_acc = model.evaluate(np.array(X_test),  np.array(y_test), verbose=0)
    print("Test_loss: {}, Test_accuracy: {} ".format(test_loss,test_acc))
    
    #creating the preprocess directory
    os.makedirs(model_path, exist_ok = True)
      
    #saving the model
    model.save(f'{model_path}/model.h5')    

Component 5: Prediction and evaluation

In [22]:
def prediction(model_path: InputPath(str), 
                preprocess_data_path: InputPath(str), 
                mlpipeline_ui_metadata_path: OutputPath(str)) -> NamedTuple('conf_m_result', [('mlpipeline_ui_metadata', 'UI_metadata')]):
    
    # import Library
    import sys, subprocess;
    subprocess.run(["python", "-m", "pip", "install", "--upgrade", "pip"])
    subprocess.run([sys.executable, '-m', 'pip', 'install','scikit-learn'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','pandas'])
    subprocess.run([sys.executable, '-m', 'pip', 'install','tensorflow'])
    import pickle, json;
    import pandas as  pd
    import numpy as np
    from collections import namedtuple
    from sklearn.metrics import confusion_matrix
    from tensorflow.keras.models import load_model

    #loading the X_test and y_test
    with open(f'{preprocess_data_path}/test', 'rb') as f:
        test_data = pickle.load(f)
    # Separate the X_test from y_test.
    X_test, y_test = test_data
    
    #loading the model
    model = load_model(f'{model_path}/model.h5')
    
    # prediction
    y_pred = np.argmax(model.predict(X_test), axis=-1)
    
    # confusion matrix
    cm = confusion_matrix(y_test, y_pred)
    vocab = list(np.unique(y_test))
    
    # confusion_matrix pair dataset 
    data = []
    for target_index, target_row in enumerate(cm):
        for predicted_index, count in enumerate(target_row):
            data.append((vocab[target_index], vocab[predicted_index], count))
    
    # convert confusion_matrix pair dataset to dataframe
    df = pd.DataFrame(data,columns=['target','predicted','count'])
    
    # change 'target', 'predicted' to integer strings
    df[['target', 'predicted']] = (df[['target', 'predicted']].astype(int)).astype(str)
    
    # create kubeflow metric metadata for UI
    metadata = {
        "outputs": [
            {
                "type": "confusion_matrix",
                "format": "csv",
                "schema": [
                    {
                        "name": "target",
                        "type": "CATEGORY"
                    },
                    {
                        "name": "predicted",
                        "type": "CATEGORY"
                    },
                    {
                        "name": "count",
                        "type": "NUMBER"
                    }
                ],
                "source": df.to_csv(header=False, index=False),
                "storage": "inline",
                "labels": [
                    "0",
                    "1",
                    "2",
                    "3",
                    "4",
                    "5",
                    "6",
                    "7",
                    "8",
                    "9",
                ]
            }
        ]
    }
    
    with open(mlpipeline_ui_metadata_path, 'w') as metadata_file:
        json.dump(metadata, metadata_file)

    conf_m_result = namedtuple('conf_m_result', ['mlpipeline_ui_metadata'])
    
    return conf_m_result(json.dumps(metadata))

In [23]:
# create light weight components
download_op = comp.create_component_from_func(download_data,base_image="python:3.7.1")
load_op = comp.create_component_from_func(load_data,base_image="python:3.7.1")
preprocess_op = comp.create_component_from_func(preprocess_data,base_image="python:3.7.1")
modeling_op = comp.create_component_from_func(modeling, base_image="tensorflow/tensorflow:latest")
predict_op = comp.create_component_from_func(prediction, base_image="tensorflow/tensorflow:latest")

Create kubeflow pipeline components from images

## Kubeflow pipeline creation

In [24]:
# create client that would enable communication with the Pipelines API server 
client = kfp.Client()

In [25]:
# define pipeline
@dsl.pipeline(name="digit-recognizer-pipeline", 
              description="Performs Preprocessing, training and prediction of digits")

# Define parameters to be fed into pipeline
def digit_recognize_pipeline(download_link: str,
                             data_path: str,
                             load_data_path: str, 
                             preprocess_data_path: str,
                             model_path:str
                            ):


    # Create download container.
    download_container = download_op(download_link)
    # Create load container.
    load_container = load_op(download_container.output)
    # Create preprocess container.
    preprocess_container = preprocess_op(load_container.output)
    # Create modeling container.
    modeling_container = modeling_op(preprocess_container.output)
    # Create prediction container.
    predict_container = predict_op(modeling_container.output, preprocess_container.output)
    

In [26]:
# replace download_link with the repo link where the data is stored https:github-repo/data-dir/{file}.csv.zip?raw=true
download_link = 'https://github.com/josepholaide/KfaaS/blob/main/kale/data/{file}.csv.zip?raw=true'
data_path = "/mnt"
load_data_path = "load"
preprocess_data_path = "preprocess"
model_path = "model"

In [27]:
pipeline_func = digit_recognize_pipeline

experiment_name = 'digit_recognizer_lightweight'
run_name = pipeline_func.__name__ + ' run'

arguments = {"download_link": download_link,
             "data_path": data_path,
             "load_data_path": load_data_path,
             "preprocess_data_path": preprocess_data_path,
             "model_path":model_path}

# Compile pipeline to generate compressed YAML definition of the pipeline.
kfp.compiler.Compiler().compile(pipeline_func,  
  '{}.zip'.format(experiment_name))

# Submit pipeline directly from pipeline function
run_result = client.create_run_from_pipeline_func(pipeline_func, 
                                                  experiment_name=experiment_name, 
                                                  run_name=run_name, 
                                                  arguments=arguments
                                                 )
