mirror of https://github.com/kubeflow/examples.git
Github Issue Summarization - Train using TFJob (#55)
* Github Issue Summarization - Train using TFJob * Create a Dockerfile to build the image for tf-job * Create a manifest to deploy the tf-job * Create instructions on how to do all of this Fixes https://github.com/kubeflow/examples/issues/43 * Address comments * Add gcloud commands * Add ks app * Update Dockerfile base image * Python train.py fixes * Remove tfjob.yaml as it is replaced by ksonnet app * Remove plot_model_history as it is not required for tfjob training * Don't change WORKDIR * Address reviewer comments * Fix links * Fix lint issues using yapf * Sort imports
This commit is contained in:
parent
41372c9314
commit
b24152cf06
|
@ -27,7 +27,9 @@ By the end of this tutorial, you should learn how to:
|
|||
## Steps:
|
||||
|
||||
1. [Setup a Kubeflow cluster](setup_a_kubeflow_cluster.md)
|
||||
1. [Training the model](training_the_model.md)
|
||||
1. Training the model. You can train the model either using Jupyter Notebook or using TFJob.
|
||||
1. [Training the model using a Jupyter Notebook](training_the_model.md)
|
||||
1. [Training the model using TFJob](training_the_model_tfjob.md)
|
||||
1. [Serving the model](serving_the_model.md)
|
||||
1. [Querying the model](querying_the_model.md)
|
||||
1. [Teardown](teardown.md)
|
||||
|
|
|
@ -0,0 +1,3 @@
|
|||
FROM gcr.io/kubeflow-images-staging/tensorflow-1.6.0-notebook-cpu
|
||||
COPY tf-job/train.py /workdir/train.py
|
||||
COPY seq2seq_utils.py /workdir/seq2seq_utils.py
|
|
@ -0,0 +1,39 @@
|
|||
apiVersion: 0.1.0
|
||||
gitVersion:
|
||||
commitSha: 40285d8a14f1ac5787e405e1023cf0c07f6aa28c
|
||||
refSpec: master
|
||||
kind: ksonnet.io/registry
|
||||
libraries:
|
||||
apache:
|
||||
path: apache
|
||||
version: master
|
||||
efk:
|
||||
path: efk
|
||||
version: master
|
||||
mariadb:
|
||||
path: mariadb
|
||||
version: master
|
||||
memcached:
|
||||
path: memcached
|
||||
version: master
|
||||
mongodb:
|
||||
path: mongodb
|
||||
version: master
|
||||
mysql:
|
||||
path: mysql
|
||||
version: master
|
||||
nginx:
|
||||
path: nginx
|
||||
version: master
|
||||
node:
|
||||
path: node
|
||||
version: master
|
||||
postgres:
|
||||
path: postgres
|
||||
version: master
|
||||
redis:
|
||||
path: redis
|
||||
version: master
|
||||
tomcat:
|
||||
path: tomcat
|
||||
version: master
|
|
@ -0,0 +1,18 @@
|
|||
apiVersion: 0.1.0
|
||||
environments:
|
||||
default:
|
||||
destination:
|
||||
namespace: namespace
|
||||
server: https://1.2.3.4
|
||||
k8sVersion: v1.7.0
|
||||
path: default
|
||||
kind: ksonnet.io/app
|
||||
name: ks-app
|
||||
registries:
|
||||
incubator:
|
||||
gitVersion:
|
||||
commitSha: 40285d8a14f1ac5787e405e1023cf0c07f6aa28c
|
||||
refSpec: master
|
||||
protocol: github
|
||||
uri: github.com/ksonnet/parts/tree/master/incubator
|
||||
version: 0.0.1
|
|
@ -0,0 +1,13 @@
|
|||
{
|
||||
global: {
|
||||
// User-defined global parameters; accessible to all component and environments, Ex:
|
||||
// replicas: 4,
|
||||
},
|
||||
components: {
|
||||
// Component-level parameters, defined initially from 'ks prototype use ...'
|
||||
// Each object below should correspond to a component in the components/ directory
|
||||
tfjob: {
|
||||
|
||||
},
|
||||
},
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
local env = std.extVar("__ksonnet/environments");
|
||||
local params = std.extVar("__ksonnet/params").components["tfjob"];
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
local tfjob = import "tfjob.libsonnet";
|
||||
|
||||
std.prune(k.core.v1.list.new([tfjob.parts(params)]))
|
|
@ -0,0 +1,67 @@
|
|||
{
|
||||
parts(params):: {
|
||||
apiVersion: "kubeflow.org/v1alpha1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: "tf-job-issue-summarization",
|
||||
namespace: params.namespace,
|
||||
},
|
||||
spec: {
|
||||
replicaSpecs: [
|
||||
{
|
||||
replicas: 1,
|
||||
template: {
|
||||
spec: {
|
||||
containers: [
|
||||
{
|
||||
image: params.image,
|
||||
name: "tensorflow",
|
||||
volumeMounts: [
|
||||
{
|
||||
name: "gcp-credentials",
|
||||
mountPath: "/secret/gcp-credentials",
|
||||
readOnly: true,
|
||||
},
|
||||
],
|
||||
command: [
|
||||
"python",
|
||||
],
|
||||
args: [
|
||||
"/workdir/train.py",
|
||||
"--sample_size=" + params.sample_size,
|
||||
"--input_data_gcs_bucket=" + params.input_data_gcs_bucket,
|
||||
"--input_data_gcs_path=" + params.input_data_gcs_path,
|
||||
"--output_model_gcs_bucket=" + params.output_model_gcs_bucket,
|
||||
"--output_model_gcs_path=" + params.output_model_gcs_path,
|
||||
],
|
||||
env: [
|
||||
{
|
||||
name: "GOOGLE_APPLICATION_CREDENTIALS",
|
||||
value: "/secret/gcp-credentials/key.json",
|
||||
},
|
||||
],
|
||||
},
|
||||
],
|
||||
volumes: [
|
||||
{
|
||||
name: "gcp-credentials",
|
||||
secret: {
|
||||
secretName: "gcp-credentials",
|
||||
},
|
||||
},
|
||||
],
|
||||
restartPolicy: "OnFailure",
|
||||
},
|
||||
},
|
||||
tfReplicaType: "MASTER",
|
||||
},
|
||||
],
|
||||
terminationPolicy: {
|
||||
chief: {
|
||||
replicaIndex: 0,
|
||||
replicaName: "MASTER",
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
|
@ -0,0 +1,4 @@
|
|||
local components = std.extVar("__ksonnet/components");
|
||||
components {
|
||||
// Insert user-specified overrides here.
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
local base = import "base.libsonnet";
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
base {
|
||||
// Insert user-specified overrides here. For example if a component is named "nginx-deployment", you might have something like:
|
||||
// "nginx-deployment"+: k.deployment.mixin.metadata.labels({foo: "bar"})
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
local params = import "../../components/params.libsonnet";
|
||||
params {
|
||||
components+: {
|
||||
// Insert component parameter overrides here. Ex:
|
||||
// guestbook +: {
|
||||
// name: "guestbook-dev",
|
||||
// replicas: params.global.replicas,
|
||||
// },
|
||||
},
|
||||
}
|
|
@ -0,0 +1,80 @@
|
|||
local k8s = import "k8s.libsonnet";
|
||||
|
||||
local apps = k8s.apps;
|
||||
local core = k8s.core;
|
||||
local extensions = k8s.extensions;
|
||||
|
||||
local hidden = {
|
||||
mapContainers(f):: {
|
||||
local podContainers = super.spec.template.spec.containers,
|
||||
spec+: {
|
||||
template+: {
|
||||
spec+: {
|
||||
// IMPORTANT: This overwrites the 'containers' field
|
||||
// for this deployment.
|
||||
containers: std.map(f, podContainers),
|
||||
},
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
mapContainersWithName(names, f)::
|
||||
local nameSet =
|
||||
if std.type(names) == "array"
|
||||
then std.set(names)
|
||||
else std.set([names]);
|
||||
local inNameSet(name) = std.length(std.setInter(nameSet, std.set([name]))) > 0;
|
||||
self.mapContainers(
|
||||
function(c)
|
||||
if std.objectHas(c, "name") && inNameSet(c.name)
|
||||
then f(c)
|
||||
else c
|
||||
),
|
||||
};
|
||||
|
||||
k8s {
|
||||
apps:: apps {
|
||||
v1beta1:: apps.v1beta1 {
|
||||
local v1beta1 = apps.v1beta1,
|
||||
|
||||
daemonSet:: v1beta1.daemonSet {
|
||||
mapContainers(f):: hidden.mapContainers(f),
|
||||
mapContainersWithName(names, f):: hidden.mapContainersWithName(names, f),
|
||||
},
|
||||
|
||||
deployment:: v1beta1.deployment {
|
||||
mapContainers(f):: hidden.mapContainers(f),
|
||||
mapContainersWithName(names, f):: hidden.mapContainersWithName(names, f),
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
core:: core {
|
||||
v1:: core.v1 {
|
||||
list:: {
|
||||
new(items)::
|
||||
{ apiVersion: "v1" } +
|
||||
{ kind: "List" } +
|
||||
self.items(items),
|
||||
|
||||
items(items):: if std.type(items) == "array" then { items+: items } else { items+: [items] },
|
||||
},
|
||||
},
|
||||
},
|
||||
|
||||
extensions:: extensions {
|
||||
v1beta1:: extensions.v1beta1 {
|
||||
local v1beta1 = extensions.v1beta1,
|
||||
|
||||
daemonSet:: v1beta1.daemonSet {
|
||||
mapContainers(f):: hidden.mapContainers(f),
|
||||
mapContainersWithName(names, f):: hidden.mapContainersWithName(names, f),
|
||||
},
|
||||
|
||||
deployment:: v1beta1.deployment {
|
||||
mapContainers(f):: hidden.mapContainers(f),
|
||||
mapContainersWithName(names, f):: hidden.mapContainersWithName(names, f),
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
File diff suppressed because it is too large
Load Diff
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,205 @@
|
|||
"""Train the github-issue-summarization model
|
||||
train.py trains the github-issue-summarization model.
|
||||
|
||||
It reads the input data from GCS in a zip file format.
|
||||
--input_data_gcs_bucket and --input_data_gcs_path specify
|
||||
the location of input data.
|
||||
|
||||
It write the model back to GCS.
|
||||
--output_model_gcs_bucket and --output_model_gcs_path specify
|
||||
the location of output.
|
||||
|
||||
It also has parameters which control the training like
|
||||
--learning_rate and --sample_size
|
||||
|
||||
"""
|
||||
import argparse
|
||||
import logging
|
||||
import zipfile
|
||||
|
||||
from google.cloud import storage # pylint: disable=no-name-in-module
|
||||
import dill as dpickle
|
||||
import numpy as np
|
||||
import pandas as pd
|
||||
from keras import optimizers
|
||||
from keras.layers import GRU, BatchNormalization, Dense, Embedding, Input
|
||||
from keras.models import Model
|
||||
from sklearn.model_selection import train_test_split
|
||||
|
||||
from ktext.preprocess import processor
|
||||
from seq2seq_utils import load_encoder_inputs, load_text_processor
|
||||
|
||||
def main(): # pylint: disable=too-many-statements
|
||||
# Parsing flags.
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--sample_size", type=int, default=2000000)
|
||||
parser.add_argument("--learning_rate", default="0.001")
|
||||
|
||||
parser.add_argument(
|
||||
"--input_data_gcs_bucket", type=str, default="kubeflow-examples")
|
||||
parser.add_argument(
|
||||
"--input_data_gcs_path",
|
||||
type=str,
|
||||
default="github-issue-summarization-data/github-issues.zip")
|
||||
|
||||
parser.add_argument(
|
||||
"--output_model_gcs_bucket", type=str, default="kubeflow-examples")
|
||||
parser.add_argument(
|
||||
"--output_model_gcs_path",
|
||||
type=str,
|
||||
default="github-issue-summarization-data/output_model.h5")
|
||||
|
||||
parser.add_argument(
|
||||
"--output_body_preprocessor_dpkl",
|
||||
type=str,
|
||||
default="body_preprocessor.dpkl")
|
||||
parser.add_argument(
|
||||
"--output_title_preprocessor_dpkl",
|
||||
type=str,
|
||||
default="title_preprocessor.dpkl")
|
||||
parser.add_argument(
|
||||
"--output_train_title_vecs_npy", type=str, default="train_title_vecs.npy")
|
||||
parser.add_argument(
|
||||
"--output_train_body_vecs_npy", type=str, default="train_body_vecs.npy")
|
||||
parser.add_argument("--output_model_h5", type=str, default="output_model.h5")
|
||||
|
||||
args = parser.parse_args()
|
||||
logging.info(args)
|
||||
|
||||
learning_rate = float(args.learning_rate)
|
||||
|
||||
pd.set_option('display.max_colwidth', 500)
|
||||
|
||||
bucket = storage.Bucket(storage.Client(), args.input_data_gcs_bucket)
|
||||
storage.Blob(args.input_data_gcs_path,
|
||||
bucket).download_to_filename('github-issues.zip')
|
||||
|
||||
zip_ref = zipfile.ZipFile('github-issues.zip', 'r')
|
||||
zip_ref.extractall('.')
|
||||
zip_ref.close()
|
||||
|
||||
# Read in data sample 2M rows (for speed of tutorial)
|
||||
traindf, testdf = train_test_split(
|
||||
pd.read_csv('github_issues.csv').sample(n=args.sample_size), test_size=.10)
|
||||
|
||||
# Print stats about the shape of the data.
|
||||
logging.info('Train: %d rows %d columns', traindf.shape[0], traindf.shape[1])
|
||||
logging.info('Test: %d rows %d columns', testdf.shape[0], testdf.shape[1])
|
||||
|
||||
train_body_raw = traindf.body.tolist()
|
||||
train_title_raw = traindf.issue_title.tolist()
|
||||
|
||||
# Clean, tokenize, and apply padding / truncating such that each document
|
||||
# length = 70. Also, retain only the top 8,000 words in the vocabulary and set
|
||||
# the remaining words to 1 which will become common index for rare words.
|
||||
body_pp = processor(keep_n=8000, padding_maxlen=70)
|
||||
train_body_vecs = body_pp.fit_transform(train_body_raw)
|
||||
|
||||
logging.info('Example original body: %s', train_body_raw[0])
|
||||
logging.info('Example body after pre-processing: %s', train_body_vecs[0])
|
||||
|
||||
# Instantiate a text processor for the titles, with some different parameters.
|
||||
title_pp = processor(
|
||||
append_indicators=True, keep_n=4500, padding_maxlen=12, padding='post')
|
||||
|
||||
# process the title data
|
||||
train_title_vecs = title_pp.fit_transform(train_title_raw)
|
||||
|
||||
logging.info('Example original title: %s', train_title_raw[0])
|
||||
logging.info('Example title after pre-processing: %s', train_title_vecs[0])
|
||||
|
||||
# Save the preprocessor.
|
||||
with open(args.output_body_preprocessor_dpkl, 'wb') as f:
|
||||
dpickle.dump(body_pp, f)
|
||||
|
||||
with open(args.output_title_preprocessor_dpkl, 'wb') as f:
|
||||
dpickle.dump(title_pp, f)
|
||||
|
||||
# Save the processed data.
|
||||
np.save(args.output_train_title_vecs_npy, train_title_vecs)
|
||||
np.save(args.output_train_body_vecs_npy, train_body_vecs)
|
||||
|
||||
_, doc_length = load_encoder_inputs(
|
||||
args.output_train_body_vecs_npy)
|
||||
|
||||
num_encoder_tokens, body_pp = load_text_processor(
|
||||
args.output_body_preprocessor_dpkl)
|
||||
num_decoder_tokens, title_pp = load_text_processor(
|
||||
args.output_title_preprocessor_dpkl)
|
||||
|
||||
# Arbitrarly set latent dimension for embedding and hidden units
|
||||
latent_dim = 300
|
||||
|
||||
###############
|
||||
# Encoder Model.
|
||||
###############
|
||||
encoder_inputs = Input(shape=(doc_length,), name='Encoder-Input')
|
||||
|
||||
# Word embeding for encoder (ex: Issue Body)
|
||||
x = Embedding(
|
||||
num_encoder_tokens, latent_dim, name='Body-Word-Embedding',
|
||||
mask_zero=False)(encoder_inputs)
|
||||
x = BatchNormalization(name='Encoder-Batchnorm-1')(x)
|
||||
|
||||
# We do not need the `encoder_output` just the hidden state.
|
||||
_, state_h = GRU(latent_dim, return_state=True, name='Encoder-Last-GRU')(x)
|
||||
|
||||
# Encapsulate the encoder as a separate entity so we can just
|
||||
# encode without decoding if we want to.
|
||||
encoder_model = Model(
|
||||
inputs=encoder_inputs, outputs=state_h, name='Encoder-Model')
|
||||
|
||||
seq2seq_encoder_out = encoder_model(encoder_inputs)
|
||||
|
||||
################
|
||||
# Decoder Model.
|
||||
################
|
||||
decoder_inputs = Input(
|
||||
shape=(None,), name='Decoder-Input') # for teacher forcing
|
||||
|
||||
# Word Embedding For Decoder (ex: Issue Titles)
|
||||
dec_emb = Embedding(
|
||||
num_decoder_tokens,
|
||||
latent_dim,
|
||||
name='Decoder-Word-Embedding',
|
||||
mask_zero=False)(decoder_inputs)
|
||||
dec_bn = BatchNormalization(name='Decoder-Batchnorm-1')(dec_emb)
|
||||
|
||||
# Set up the decoder, using `decoder_state_input` as initial state.
|
||||
decoder_gru = GRU(
|
||||
latent_dim, return_state=True, return_sequences=True, name='Decoder-GRU')
|
||||
decoder_gru_output, _ = decoder_gru(dec_bn, initial_state=seq2seq_encoder_out)
|
||||
x = BatchNormalization(name='Decoder-Batchnorm-2')(decoder_gru_output)
|
||||
|
||||
# Dense layer for prediction
|
||||
decoder_dense = Dense(
|
||||
num_decoder_tokens, activation='softmax', name='Final-Output-Dense')
|
||||
decoder_outputs = decoder_dense(x)
|
||||
|
||||
################
|
||||
# Seq2Seq Model.
|
||||
################
|
||||
|
||||
seq2seq_Model = Model([encoder_inputs, decoder_inputs], decoder_outputs)
|
||||
|
||||
seq2seq_Model.compile(
|
||||
optimizer=optimizers.Nadam(lr=learning_rate),
|
||||
loss='sparse_categorical_crossentropy')
|
||||
|
||||
seq2seq_Model.summary()
|
||||
|
||||
#############
|
||||
# Save model.
|
||||
#############
|
||||
seq2seq_Model.save(args.output_model_h5)
|
||||
|
||||
######################
|
||||
# Upload model to GCS.
|
||||
######################
|
||||
bucket = storage.Bucket(storage.Client(), args.output_model_gcs_bucket)
|
||||
storage.Blob(args.output_model_gcs_path, bucket).upload_from_filename(
|
||||
args.output_model_h5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
|
@ -0,0 +1,92 @@
|
|||
# Training the model using TFJob
|
||||
|
||||
Kubeflow offers a TensorFlow job controller for kubernetes. This allows you to run your distributed Tensorflow training
|
||||
job on a kubernetes cluster. For this training job, we will read our training data from GCS and write our output model
|
||||
back to GCS.
|
||||
|
||||
## Create the image for training
|
||||
|
||||
The [tf-job](notebooks/tf-job) directory contains the necessary files to create a image for training. The [train.py](notebooks/tf-job/train.py) file contains the training code. Here is how you can create an image and push it to gcr.
|
||||
|
||||
```commandline
|
||||
cd notebooks/
|
||||
docker build . -t gcr.io/agwl-kubeflow/tf-job-issue-summarization:latest
|
||||
gcloud docker -- push gcr.io/agwl-kubeflow/tf-job-issue-summarization:latest
|
||||
```
|
||||
|
||||
## GCS Service account
|
||||
|
||||
* Create a service account which will be used to read and write data from the GCS Bucket.
|
||||
|
||||
* Give the storage account `roles/storage.admin` role so that it can access GCS Buckets.
|
||||
|
||||
* Download its key as a json file and create a secret named `gcp-credentials` with the key `key.json`
|
||||
|
||||
```commandline
|
||||
SERVICE_ACCOUNT=github-issue-summarization
|
||||
PROJECT=kubeflow-example-project # The GCP Project name
|
||||
gcloud iam service-accounts --project=${PROJECT} create ${SERVICE_ACCOUNT} \
|
||||
--display-name "GCP Service Account for use with kubeflow examples"
|
||||
|
||||
gcloud projects add-iam-policy-binding ${PROJECT} --member \
|
||||
serviceAccount:${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com --role=roles/storage.admin
|
||||
|
||||
KEY_FILE=/home/agwl/secrets/${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com.json
|
||||
gcloud iam service-accounts keys create ${KEY_FILE} \
|
||||
--iam-account ${SERVICE_ACCOUNT}@${PROJECT}.iam.gserviceaccount.com
|
||||
|
||||
kubectl --namespace=${NAMESPACE} create secret generic gcp-credentials --from-file=key.json="${KEY_FILE}"
|
||||
```
|
||||
|
||||
|
||||
## Run the TFJob using your image
|
||||
|
||||
[tf-job](notebooks/tf-job) contains a ksonnet app([ks-app](notebooks/tf-job/ks-app)) to deploy the TFJob.
|
||||
|
||||
Create an environment to deploy the ksonnet app
|
||||
|
||||
```commandline
|
||||
cd notebooks/tf-job/ks-app
|
||||
ks env add tfjob --namespace ${NAMESPACE}
|
||||
```
|
||||
|
||||
Set the appropriate params for the tfjob component
|
||||
|
||||
```commandline
|
||||
ks param set tfjob namespace ${NAMESPACE} --env=tfjob
|
||||
|
||||
# The image pushed in the previous step
|
||||
ks param set tfjob image "gcr.io/agwl-kubeflow/tf-job-issue-summarization:latest" --env=tfjob
|
||||
|
||||
# Sample Size for training
|
||||
ks param set tfjob sample_size 100000 --env=tfjob
|
||||
|
||||
# Set the input and output GCS Bucket locations
|
||||
ks param set tfjob input_data_gcs_bucket "kubeflow-examples" --env=tfjob
|
||||
ks param set tfjob input_data_gcs_path "github-issue-summarization-data/github-issues.zip" --env=tfjob
|
||||
ks param set tfjob output_model_gcs_bucket "kubeflow-examples" --env=tfjob
|
||||
ks param set tfjob output_model_gcs_path "github-issue-summarization-data/output_model.h5" --env=tfjob
|
||||
```
|
||||
|
||||
Deploy the app:
|
||||
|
||||
```commandline
|
||||
ks apply tfjob -c tfjob
|
||||
```
|
||||
|
||||
In a while you should see a new pod with the label `tf_job_name=tf-job-issue-summarization`
|
||||
```commandline
|
||||
kubectl get pods -n=${NAMESPACE} -ltf_job_name=tf-job-issue-summarization
|
||||
```
|
||||
|
||||
You can view the logs of the tf-job operator using
|
||||
|
||||
```commandline
|
||||
kubectl logs -f $(kubectl get pods -n=${NAMESPACE} -lname=tf-job-operator -o=jsonpath='{.items[0].metadata.name}')
|
||||
```
|
||||
|
||||
You can view the actual training logs using
|
||||
|
||||
```commandline
|
||||
kubectl logs -f $(kubectl get pods -n=${NAMESPACE} -ltf_job_name=tf-job-issue-summarization -o=jsonpath='{.items[0].metadata.name}')
|
||||
```
|
Loading…
Reference in New Issue