Use the client libs to do a GCS copy instead of gsutil (#558)

* use gcs client libs to copy checkpoint dir

* more minor cleanup, use tagged image, use newer pipeline param spec. syntax.
pylint cleanup.
added set_memory_limit() to notebook pipeline training steps.
modified the pipelines definitions to use the user-defined params as defaults.

* put a retry loop around the copy_blob
This commit is contained in:
Amy 2019-05-17 14:00:11 -07:00 committed by Kubernetes Prow Robot
parent 21f76812ec
commit 767ecd240d
5 changed files with 99 additions and 36 deletions

View File

@ -20,11 +20,13 @@ RUN apt-get install --no-install-recommends -y -q ca-certificates python-dev pyt
wget unzip git
RUN easy_install pip
RUN pip install --upgrade pip
RUN pip install tensorflow-probability==0.5
RUN pip install tensor2tensor==1.11.0
RUN pip install tensorflow_hub==0.1.1
RUN pip install pyyaml==3.12 six==1.11.0
RUN pip install google-cloud-storage
RUN wget -nv https://dl.google.com/dl/cloudsdk/release/google-cloud-sdk.zip && \
unzip -qq google-cloud-sdk.zip -d /tools && \

View File

@ -12,19 +12,75 @@
# See the License for the specific language governing permissions and
# limitations under the License.
"""..."""
import argparse
import json
import logging
import subprocess
import time
from urlparse import urlparse
from google.cloud import storage
# location of the model checkpoint from which we'll start our training
SOURCE_BUCKET = 'aju-dev-demos-codelabs'
PREFIX = 'kubecon/model_output_tbase.bak2019000/'
def copy_blob(storage_client, source_bucket, source_blob, target_bucket_name, new_blob_name,
new_blob_prefix, prefix):
"""Copies a blob from one bucket to another with a new name."""
target_bucket = storage_client.get_bucket(target_bucket_name)
new_blob_name_trimmed = new_blob_name.replace(prefix, '')
new_blob_full_name = new_blob_prefix + '/'+ new_blob_name_trimmed
new_blob = source_bucket.copy_blob(
source_blob, target_bucket, new_blob_full_name)
logging.info('blob %s in bucket %s copied to blob %s in bucket %s',
str(source_blob.name), str(source_bucket.name), str(new_blob.name), str(target_bucket.name))
def copy_checkpoint(new_blob_prefix, target_bucket):
storage_client = storage.Client()
source_bucket = storage_client.bucket(SOURCE_BUCKET)
retries = 10
# Lists objects with the given prefix.
blob_list = list(source_bucket.list_blobs(prefix=PREFIX))
logging.info('Copying files:')
for blob in blob_list:
sleeptime = 0.1
num_retries = 0
while num_retries < retries:
logging.info('copying %s; retry %s', blob.name, num_retries)
try:
copy_blob(storage_client, source_bucket, blob, target_bucket, blob.name, new_blob_prefix,
PREFIX)
break
except Exception as e: #pylint: disable=broad-except
logging.warning(e)
time.sleep(sleeptime)
sleeptime *= 2
num_retries += 1
def main():
logging.getLogger().setLevel(logging.INFO)
parser = argparse.ArgumentParser(description='ML Trainer')
parser.add_argument(
'--model-dir',
help='...',
required=True)
parser.add_argument(
'--working-dir',
help='...',
required=True)
parser.add_argument(
'--data-dir',
help='...',
@ -62,11 +118,13 @@ def main():
print("model_startpoint: %s" % model_startpoint)
model_dir = args.model_dir
print("model_dir: %s" % model_dir)
model_copy_command = ['gsutil', '-m', 'cp', '-r', model_startpoint, model_dir
]
print(model_copy_command)
result1 = subprocess.call(model_copy_command)
print(result1)
# copy over the checkpoint directory
target_bucket = urlparse(args.working_dir).netloc
print("target bucket: %s", target_bucket)
new_blob_prefix = model_dir.replace('gs://' + target_bucket + '/', '')
print("new_blob_prefix: %s", new_blob_prefix)
copy_checkpoint(new_blob_prefix, target_bucket)
print('training steps (total): %s' % args.train_steps)

View File

@ -22,24 +22,22 @@ import kfp.gcp as gcp
description='Demonstrate Tensor2Tensor-based training and TF-Serving'
)
def gh_summ( #pylint: disable=unused-argument
train_steps: dsl.PipelineParam = dsl.PipelineParam(name='train-steps', value=2019300),
project: dsl.PipelineParam = dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),
github_token: dsl.PipelineParam = dsl.PipelineParam(
name='github-token', value='YOUR_GITHUB_TOKEN_HERE'),
working_dir: dsl.PipelineParam = dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),
checkpoint_dir: dsl.PipelineParam = dsl.PipelineParam(
name='checkpoint-dir',
value='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000'),
deploy_webapp: dsl.PipelineParam = dsl.PipelineParam(name='deploy-webapp', value='true'),
data_dir: dsl.PipelineParam = dsl.PipelineParam(
name='data-dir', value='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/')):
train_steps=2019300,
project='YOUR_PROJECT_HERE',
github_token='YOUR_GITHUB_TOKEN_HERE',
working_dir='YOUR_GCS_DIR_HERE',
checkpoint_dir='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',
deploy_webapp='true',
data_dir='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'
):
train = dsl.ContainerOp(
name='train',
image='gcr.io/google-samples/ml-pipeline-t2ttrain',
image='gcr.io/google-samples/ml-pipeline-t2ttrain:v1ap',
arguments=["--data-dir", data_dir,
"--checkpoint-dir", checkpoint_dir,
"--working-dir", working_dir,
"--model-dir", '%s/%s/model_output' % (working_dir, '{{workflow.name}}'),
"--train-steps", train_steps, "--deploy-webapp", deploy_webapp],
file_outputs={'output': '/tmp/output'}

View File

@ -118,20 +118,21 @@
" description='Demonstrate Tensor2Tensor-based training and TF-Serving'\n",
")\n",
"def gh_summ(\n",
" train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=2019300),\n",
" project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),\n",
" github_token: dsl.PipelineParam=dsl.PipelineParam(name='github-token', value='YOUR_GITHUB_TOKEN_HERE'),\n",
" working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),\n",
" checkpoint_dir: dsl.PipelineParam=dsl.PipelineParam(name='checkpoint-dir', value='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000'),\n",
" deploy_webapp: dsl.PipelineParam=dsl.PipelineParam(name='deploy-webapp', value='true'),\n",
" data_dir: dsl.PipelineParam=dsl.PipelineParam(name='data-dir', value='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/')):\n",
"\n",
" train_steps = 2019300,\n",
" project = PROJECT_NAME,\n",
" github_token = GITHUB_TOKEN,\n",
" working_dir = GITHUB_TOKEN,\n",
" checkpoint_dir = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',\n",
" deploy_webapp = 'true',\n",
" data_dir = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n",
" ):\n",
"\n",
" train = dsl.ContainerOp(\n",
" name = 'train',\n",
" image = 'gcr.io/google-samples/ml-pipeline-t2ttrain',\n",
" image = 'gcr.io/google-samples/ml-pipeline-t2ttrain:v1ap',\n",
" arguments = [ \"--data-dir\", data_dir,\n",
" \"--checkpoint-dir\", checkpoint_dir,\n",
" \"--working-dir\", working_dir,\n",
" \"--model-dir\", '%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n",
" \"--train-steps\", train_steps, \"--deploy-webapp\" , deploy_webapp],\n",
" file_outputs={'output': '/tmp/output'}\n",
@ -147,6 +148,7 @@
" )\n",
" serve.after(train)\n",
" train.set_gpu_limit(4)\n",
" train.set_memory_limit('48G')\n",
"\n",
" with dsl.Condition(train.output=='true'):\n",
" webapp = dsl.ContainerOp(\n",
@ -254,13 +256,14 @@
" description='Demonstrate TFT-based feature processing, TFMA, TFJob, CMLE OP, and TF-Serving'\n",
")\n",
"def gh_summ2(\n",
" train_steps: dsl.PipelineParam=dsl.PipelineParam(name='train-steps', value=2019300),\n",
" project: dsl.PipelineParam=dsl.PipelineParam(name='project', value='YOUR_PROJECT_HERE'),\n",
" github_token: dsl.PipelineParam=dsl.PipelineParam(name='github-token', value='YOUR_GITHUB_TOKEN_HERE'),\n",
" working_dir: dsl.PipelineParam=dsl.PipelineParam(name='working-dir', value='YOUR_GCS_DIR_HERE'),\n",
" checkpoint_dir: dsl.PipelineParam=dsl.PipelineParam(name='checkpoint-dir', value='gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000'),\n",
" deploy_webapp: dsl.PipelineParam=dsl.PipelineParam(name='deploy-webapp', value='true'),\n",
" data_dir: dsl.PipelineParam=dsl.PipelineParam(name='data-dir', value='gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/')):\n",
" train_steps = 2019300,\n",
" project = PROJECT_NAME,\n",
" github_token = GITHUB_TOKEN,\n",
" working_dir = GITHUB_TOKEN,\n",
" checkpoint_dir = 'gs://aju-dev-demos-codelabs/kubecon/model_output_tbase.bak2019000',\n",
" deploy_webapp = 'true',\n",
" data_dir = 'gs://aju-dev-demos-codelabs/kubecon/t2t_data_gh_all/'\n",
" ):\n",
"\n",
" # The new pre-processing op.\n",
" preproc = preproc_op(project=project,\n",
@ -268,9 +271,10 @@
"\n",
" train = dsl.ContainerOp(\n",
" name = 'train',\n",
" image = 'gcr.io/google-samples/ml-pipeline-t2ttrain',\n",
" image = 'gcr.io/google-samples/ml-pipeline-t2ttrain:v1ap',\n",
" arguments = [ \"--data-dir\", data_dir,\n",
" \"--checkpoint-dir\", checkpoint_dir,\n",
" \"--working-dir\", working_dir,\n",
" \"--model-dir\", '%s/%s/model_output' % (working_dir, '{{workflow.name}}'),\n",
" \"--train-steps\", train_steps, \"--deploy-webapp\" , deploy_webapp],\n",
" file_outputs={'output': '/tmp/output'}\n",
@ -287,6 +291,7 @@
" )\n",
" serve.after(train)\n",
" train.set_gpu_limit(4)\n",
" train.set_memory_limit('48G') \n",
"\n",
" with dsl.Condition(train.output=='true'):\n",
" webapp = dsl.ContainerOp(\n",
@ -379,7 +384,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.4"
"version": "3.6.7"
}
},
"nbformat": 4,