mirror of https://github.com/kubeflow/examples.git
Support training using a PVC for the data. (#98)
* Support training using a PVC for the data. * This will make it easier to run the example on Katacoda and non-GCP platforms. * Modify train.py so we can use a GCS location or local file paths. * Update the Dockerfile. The jupyter Docker images and had a bunch of dependencies removed and the latest images don't have the dependencies needed to run the examples. * Creat a tfjob-pvc component that trains reading/writing using PVC and not GCP. * * Address reviewer comments * Ignore changes to the ksonnet parameters when determining whether to include dirty and sha of the diff in the image. This way we can update the ksonnet app with the newly built image without it leading to subsequent images being marked dirty. * Fix lint issues. * Fix lint import issue.
This commit is contained in:
parent
34d6f8809d
commit
4b33d44af6
|
@ -12,6 +12,12 @@ environments:
|
|||
server: https://35.188.73.10
|
||||
k8sVersion: v1.7.0
|
||||
path: default
|
||||
jlewi:
|
||||
destination:
|
||||
namespace: jlewi
|
||||
server: https://35.188.73.10
|
||||
k8sVersion: v1.7.0
|
||||
path: jlewi
|
||||
kind: ksonnet.io/app
|
||||
libraries:
|
||||
core:
|
||||
|
|
|
@ -50,6 +50,13 @@
|
|||
tfjob: {
|
||||
namespace: "null",
|
||||
},
|
||||
"tfjob-pvc": {
|
||||
image: "gcr.io/kubeflow-dev/tf-job-issue-summarization:v20180425-e79f888",
|
||||
input_data: "/data/github_issues.csv",
|
||||
namespace: "null",
|
||||
output_model: "/data/model.h5",
|
||||
sample_size: "2000000",
|
||||
},
|
||||
ui: {
|
||||
namespace: "null",
|
||||
},
|
||||
|
|
|
@ -0,0 +1,61 @@
|
|||
// Train the model reading & writing the data from a PVC.
|
||||
local env = std.extVar("__ksonnet/environments");
|
||||
local params = std.extVar("__ksonnet/params").components["tfjob-pvc"];
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
local tfjob = {
|
||||
apiVersion: "kubeflow.org/v1alpha1",
|
||||
kind: "TFJob",
|
||||
metadata: {
|
||||
name: "tf-job-issue-summarization-pvc",
|
||||
namespace: env.namespace,
|
||||
},
|
||||
spec: {
|
||||
replicaSpecs: [
|
||||
{
|
||||
replicas: 1,
|
||||
template: {
|
||||
spec: {
|
||||
containers: [
|
||||
{
|
||||
image: params.image,
|
||||
name: "tensorflow",
|
||||
volumeMounts: [
|
||||
{
|
||||
name: "data",
|
||||
mountPath: "/data",
|
||||
},
|
||||
],
|
||||
command: [
|
||||
"python",
|
||||
"/workdir/train.py",
|
||||
"--sample_size=" + std.toString(params.sample_size),
|
||||
"--input_data=" + params.input_data,
|
||||
"--output_model=" + params.output_model,
|
||||
],
|
||||
},
|
||||
],
|
||||
volumes: [
|
||||
{
|
||||
name: "data",
|
||||
persistentVolumeClaim: {
|
||||
claimName: "data-pvc",
|
||||
},
|
||||
},
|
||||
],
|
||||
restartPolicy: "OnFailure",
|
||||
},
|
||||
},
|
||||
tfReplicaType: "MASTER",
|
||||
},
|
||||
],
|
||||
terminationPolicy: {
|
||||
chief: {
|
||||
replicaIndex: 0,
|
||||
replicaName: "MASTER",
|
||||
},
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
std.prune(k.core.v1.list.new([tfjob]))
|
|
@ -28,7 +28,7 @@
|
|||
],
|
||||
args: [
|
||||
"/workdir/train.py",
|
||||
"--sample_size=" + params.sample_size,
|
||||
"--sample_size=" + std.toString(params.sample_size),
|
||||
"--input_data_gcs_bucket=" + params.input_data_gcs_bucket,
|
||||
"--input_data_gcs_path=" + params.input_data_gcs_path,
|
||||
"--output_model_gcs_bucket=" + params.output_model_gcs_bucket,
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
local base = import "base.libsonnet";
|
||||
local k = import "k.libsonnet";
|
||||
|
||||
base + {
|
||||
// Insert user-specified overrides here. For example if a component is named "nginx-deployment", you might have something like:
|
||||
// "nginx-deployment"+: k.deployment.mixin.metadata.labels({foo: "bar"})
|
||||
}
|
|
@ -0,0 +1,10 @@
|
|||
local params = import "../../components/params.libsonnet";
|
||||
params + {
|
||||
components +: {
|
||||
// Insert component parameter overrides here. Ex:
|
||||
// guestbook +: {
|
||||
// name: "guestbook-dev",
|
||||
// replicas: params.global.replicas,
|
||||
// },
|
||||
},
|
||||
}
|
|
@ -1,3 +1,9 @@
|
|||
FROM gcr.io/kubeflow-images-staging/tensorflow-1.6.0-notebook-cpu
|
||||
RUN pip install ktext
|
||||
RUN pip install annoy
|
||||
RUN pip install --upgrade google-cloud
|
||||
RUN pip install sklearn h5py
|
||||
RUN pip install nltk
|
||||
|
||||
COPY train.py /workdir/train.py
|
||||
COPY seq2seq_utils.py /workdir/seq2seq_utils.py
|
||||
|
|
|
@ -0,0 +1,56 @@
|
|||
# Copyright 2017 The Kubernetes Authors.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
#
|
||||
# Requirements:
|
||||
# https://github.com/mattrobenolt/jinja2-cli
|
||||
# pip install jinja2-clie
|
||||
# Update the Airflow deployment
|
||||
|
||||
# List any changed files. We only include files in the notebooks directory.
|
||||
# because that is the code in the docker image.
|
||||
# In particular we exclude changes to the ksonnet configs.
|
||||
CHANGED_FILES := $(shell git diff-files --relative=github_issue_summarization/notebooks)
|
||||
|
||||
ifeq ($(strip $(CHANGED_FILES)),)
|
||||
# Changed files is empty; not dirty
|
||||
# Don't include --dirty because it could be dirty if files outside the ones we care
|
||||
# about changed.
|
||||
TAG := $(shell date +v%Y%m%d)-$(shell git describe --tags --always)
|
||||
else
|
||||
TAG := $(shell date +v%Y%m%d)-$(shell git describe --tags --always --dirty)-$(shell git diff | shasum -a256 | cut -c -6)
|
||||
endif
|
||||
|
||||
DIR := ${CURDIR}
|
||||
|
||||
# You can override this on the command line as
|
||||
# make PROJECT=kubeflow-examples <target>
|
||||
PROJECT := kubeflow-examples
|
||||
|
||||
IMG := gcr.io/$(PROJECT)/tf-job-issue-summarization
|
||||
|
||||
echo:
|
||||
@echo changed files $(CHANGED_FILES)
|
||||
@echo tag $(TAG)
|
||||
push: build
|
||||
gcloud docker -- push $(IMG):$(TAG)
|
||||
|
||||
set-image: push
|
||||
# Set the image to use
|
||||
cd ../ks-kubeflow && ks param set tfjob-pvc image $(IMG):$(TAG)
|
||||
|
||||
# To build without the cache set the environment variable
|
||||
# export DOCKER_BUILD_OPTS=--no-cache
|
||||
build:
|
||||
docker build ${DOCKER_BUILD_OPTS} -f Dockerfile -t $(IMG):$(TAG) ./
|
||||
@echo Built $(IMG):$(TAG)
|
|
@ -15,6 +15,8 @@ It also has parameters which control the training like
|
|||
"""
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
import zipfile
|
||||
|
||||
from google.cloud import storage # pylint: disable=no-name-in-module
|
||||
|
@ -29,12 +31,34 @@ from sklearn.model_selection import train_test_split
|
|||
from ktext.preprocess import processor
|
||||
from seq2seq_utils import load_encoder_inputs, load_text_processor
|
||||
|
||||
def main(): # pylint: disable=too-many-statements
|
||||
GCS_REGEX = re.compile("gs://([^/]*)(/.*)?")
|
||||
|
||||
|
||||
def split_gcs_uri(gcs_uri):
|
||||
"""Split a GCS URI into bucket and path."""
|
||||
m = GCS_REGEX.match(gcs_uri)
|
||||
bucket = m.group(1)
|
||||
path = ""
|
||||
if m.group(2):
|
||||
path = m.group(2).lstrip("/")
|
||||
return bucket, path
|
||||
|
||||
|
||||
def main(): # pylint: disable=too-many-statements
|
||||
# Parsing flags.
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--sample_size", type=int, default=2000000)
|
||||
parser.add_argument("--learning_rate", default="0.001")
|
||||
|
||||
parser.add_argument(
|
||||
"--input_data",
|
||||
type=str,
|
||||
default="",
|
||||
help="The input location. Can be a GCS or local file path.")
|
||||
|
||||
# TODO(jlewi): The following arguments are deprecated; just
|
||||
# use input_data. We should remove them as soon as all call sites
|
||||
# are updated.
|
||||
parser.add_argument(
|
||||
"--input_data_gcs_bucket", type=str, default="kubeflow-examples")
|
||||
parser.add_argument(
|
||||
|
@ -43,7 +67,15 @@ def main(): # pylint: disable=too-many-statements
|
|||
default="github-issue-summarization-data/github-issues.zip")
|
||||
|
||||
parser.add_argument(
|
||||
"--output_model_gcs_bucket", type=str, default="kubeflow-examples")
|
||||
"--output_model",
|
||||
type=str,
|
||||
default="",
|
||||
help="The output location for the model GCS or local file path.")
|
||||
|
||||
# TODO(jlewi): We should get rid of the following arguments and just use
|
||||
# --output_model_h5. If the output is a gs:// location we should use
|
||||
# a local file and then upload it to GCS.
|
||||
parser.add_argument("--output_model_gcs_bucket", type=str, default="")
|
||||
parser.add_argument(
|
||||
"--output_model_gcs_path",
|
||||
type=str,
|
||||
|
@ -64,23 +96,55 @@ def main(): # pylint: disable=too-many-statements
|
|||
parser.add_argument("--output_model_h5", type=str, default="output_model.h5")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format=('%(levelname)s|%(asctime)s'
|
||||
'|%(pathname)s|%(lineno)d| %(message)s'),
|
||||
datefmt='%Y-%m-%dT%H:%M:%S',
|
||||
)
|
||||
logging.getLogger().setLevel(logging.INFO)
|
||||
logging.info(args)
|
||||
|
||||
learning_rate = float(args.learning_rate)
|
||||
|
||||
pd.set_option('display.max_colwidth', 500)
|
||||
|
||||
bucket = storage.Bucket(storage.Client(), args.input_data_gcs_bucket)
|
||||
storage.Blob(args.input_data_gcs_path,
|
||||
bucket).download_to_filename('github-issues.zip')
|
||||
# For backwords compatibility
|
||||
input_data_gcs_bucket = None
|
||||
input_data_gcs_path = None
|
||||
|
||||
zip_ref = zipfile.ZipFile('github-issues.zip', 'r')
|
||||
zip_ref.extractall('.')
|
||||
zip_ref.close()
|
||||
if not args.input_data:
|
||||
# Since input_data isn't set fall back on old arguments.
|
||||
input_data_gcs_bucket = args.input_data_gcs_bucket
|
||||
input_data_gcs_path = args.input_data_gcs_path
|
||||
else:
|
||||
if args.input_data.startswith('gs://'):
|
||||
input_data_gcs_bucket, input_data_gcs_path = split_gcs_uri(
|
||||
args.input_data)
|
||||
|
||||
if input_data_gcs_bucket:
|
||||
logging.info("Download bucket %s object %s.", input_data_gcs_bucket,
|
||||
input_data_gcs_path)
|
||||
bucket = storage.Bucket(storage.Client(), input_data_gcs_bucket)
|
||||
args.input_data = 'github-issues.zip'
|
||||
storage.Blob(input_data_gcs_path, bucket).download_to_filename(
|
||||
args.input_data)
|
||||
|
||||
ext = os.path.splitext(args.input_data)[-1]
|
||||
if ext.lower() == '.zip':
|
||||
zip_ref = zipfile.ZipFile(args.input_data, 'r')
|
||||
zip_ref.extractall('.')
|
||||
zip_ref.close()
|
||||
# TODO(jlewi): Hardcoding the file in the Archive to use is brittle.
|
||||
# We should probably just require the input to be a CSV file.
|
||||
csv_file = 'github_issues.csv'
|
||||
else:
|
||||
csv_file = args.input_data
|
||||
|
||||
# Read in data sample 2M rows (for speed of tutorial)
|
||||
traindf, testdf = train_test_split(
|
||||
pd.read_csv('github_issues.csv').sample(n=args.sample_size), test_size=.10)
|
||||
pd.read_csv(csv_file).sample(n=args.sample_size), test_size=.10)
|
||||
|
||||
# Print stats about the shape of the data.
|
||||
logging.info('Train: %d rows %d columns', traindf.shape[0], traindf.shape[1])
|
||||
|
@ -119,8 +183,7 @@ def main(): # pylint: disable=too-many-statements
|
|||
np.save(args.output_train_title_vecs_npy, train_title_vecs)
|
||||
np.save(args.output_train_body_vecs_npy, train_body_vecs)
|
||||
|
||||
_, doc_length = load_encoder_inputs(
|
||||
args.output_train_body_vecs_npy)
|
||||
_, doc_length = load_encoder_inputs(args.output_train_body_vecs_npy)
|
||||
|
||||
num_encoder_tokens, body_pp = load_text_processor(
|
||||
args.output_body_preprocessor_dpkl)
|
||||
|
@ -196,9 +259,25 @@ def main(): # pylint: disable=too-many-statements
|
|||
######################
|
||||
# Upload model to GCS.
|
||||
######################
|
||||
bucket = storage.Bucket(storage.Client(), args.output_model_gcs_bucket)
|
||||
storage.Blob(args.output_model_gcs_path, bucket).upload_from_filename(
|
||||
args.output_model_h5)
|
||||
# For backwords compatibility
|
||||
output_model_gcs_bucket = None
|
||||
output_model_gcs_path = None
|
||||
|
||||
if not args.output_model:
|
||||
# Since input_data isn't set fall back on old arguments.
|
||||
output_model_gcs_bucket = args.output_model_gcs_bucket
|
||||
output_model_gcs_path = args.output_model_gcs_path
|
||||
else:
|
||||
if args.output_model.startswith('gs://'):
|
||||
output_model_gcs_bucket, output_model_gcs_path = split_gcs_uri(
|
||||
args.output_model)
|
||||
|
||||
if output_model_gcs_bucket:
|
||||
logging.info("Uploading model to bucket %s path %s.",
|
||||
output_model_gcs_bucket, output_model_gcs_path)
|
||||
bucket = storage.Bucket(storage.Client(), output_model_gcs_bucket)
|
||||
storage.Blob(output_model_gcs_path, bucket).upload_from_filename(
|
||||
args.output_model_h5)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
@ -10,11 +10,44 @@ The [notebooks](notebooks) directory contains the necessary files to create a im
|
|||
|
||||
```commandline
|
||||
cd notebooks/
|
||||
docker build . -t gcr.io/agwl-kubeflow/tf-job-issue-summarization:latest
|
||||
gcloud docker -- push gcr.io/agwl-kubeflow/tf-job-issue-summarization:latest
|
||||
make PROJECT=${PROJECT} set-image
|
||||
```
|
||||
## Train Using PVC
|
||||
|
||||
If you don't have access to GCS or don't want to use GCS you
|
||||
can use a persistent volume to store the data and model.
|
||||
|
||||
Create a pvc
|
||||
|
||||
```
|
||||
ks apply --env=${KF_ENV} -c data-pvc
|
||||
```
|
||||
|
||||
* Your cluster must have a default storage class defined for
|
||||
this to work.
|
||||
|
||||
Run the job to download the data to the PVC.
|
||||
|
||||
```
|
||||
ks apply --env=${KF_ENV} -c data-downloader
|
||||
```
|
||||
|
||||
## GCS Service account
|
||||
Submit the training job
|
||||
|
||||
```
|
||||
ks apply --env=${KF_ENV} -c tfjob-pvc
|
||||
```
|
||||
|
||||
The resulting model will be stored on PVC so to access it you will
|
||||
need to run a pod and attach the PVC. For serving you can just
|
||||
attach it the pod serving the model.
|
||||
|
||||
## Training Using GCS
|
||||
|
||||
If you are running on GCS you can train using GCS to store the input
|
||||
and the resulting model.
|
||||
|
||||
### GCS Service account
|
||||
|
||||
* Create a service account which will be used to read and write data from the GCS Bucket.
|
||||
|
||||
|
@ -39,7 +72,7 @@ kubectl --namespace=${NAMESPACE} create secret generic gcp-credentials --from-fi
|
|||
```
|
||||
|
||||
|
||||
## Run the TFJob using your image
|
||||
### Run the TFJob using your image
|
||||
|
||||
[ks-kubeflow](ks-kubeflow) contains a ksonnet app to deploy the TFJob.
|
||||
|
||||
|
|
Loading…
Reference in New Issue