Add dataflow launcher script (#364)

* add search index creator container

* add pipeline

* update op name

* update readme

* update scripts

* typo fix

* Update Makefile

* Update Makefile

* address comments

* fix ks

* update pipeline

* restructure the images

* remove echo

* update image

* add code embedding launcher

* small fixes

* format

* format

* address comments

* add flag

* Update arguments.py

* update parameter

* revert to use --wait_until_finished. --wait_until_finish never works

* update image
This commit is contained in:
IronPan 2018-11-27 19:23:54 -08:00 committed by k8s-ci-robot
parent 760ba7b9e8
commit 7ffc50e0ee
12 changed files with 120 additions and 71 deletions

View File

@ -7,20 +7,20 @@
{
"id": "build",
"name": "gcr.io/cloud-builders/docker",
"args": ["build", "-t", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"args": ["build", "-t", "gcr.io/kubeflow-examples/code-search/ks:" + std.extVar("tag"),
"--label=git-versions=" + std.extVar("gitVersion"),
"--file=docker/index_updater/Dockerfile",
"--file=docker/ks/Dockerfile",
"."],
},
{
"id": "tag",
"name": "gcr.io/cloud-builders/docker",
"args": ["tag", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/index_updater:latest",],
"args": ["tag", "gcr.io/kubeflow-examples/code-search/ks:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/ks:latest",],
"waitFor": ["build"],
},
],
"images": ["gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/index_updater:latest",
"images": ["gcr.io/kubeflow-examples/code-search/ks:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/ks:latest",
],
}

View File

@ -0,0 +1,8 @@
#!/usr/bin/env bash
# Common logic to initialize kubectl to use the underlying cluster
kubectl config set-cluster "${cluster}" --server=https://kubernetes.default --certificate-authority=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
kubectl config set-credentials pipeline --token "$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
kubectl config set-context kubeflow --cluster "${cluster}" --user pipeline
kubectl config use-context kubeflow
ks env set "${ksEnvName}" --namespace="${namespace}"

View File

@ -24,20 +24,15 @@ usage() {
names=(workingDir workflowId dataDir namespace cluster)
source "${DIR}/parse_arguments.sh"
# Configure kubectl to use the underlying cluster
kubectl config set-cluster "${cluster}" --server=https://kubernetes.default --certificate-authority=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
kubectl config set-credentials pipeline --token "$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
kubectl config set-context kubeflow --cluster "${cluster}" --user pipeline
kubectl config use-context kubeflow
ks env set "${ksEnvName}" --namespace="${namespace}"
source "${DIR}/initialize_kubectl.sh"
# Apply parameters
ks param set ${component} dataDir ${dataDir} --env ${ksEnvName}
ks param set ${component} jobNameSuffix ${workflowId} --env ${ksEnvName}
ks param set ${component} lookupFile ${workingDir}/code-embeddings-index/${workflowId}/embedding-to-info.csv --env ${ksEnvName}
ks param set ${component} indexFile ${workingDir}/code-embeddings-index/${workflowId}/embeddings.index --env ${ksEnvName}
ks param set ${component} lookupFile ${workingDir}/code-embeddings-index/embedding-to-info.csv --env ${ksEnvName}
ks param set ${component} indexFile ${workingDir}/code-embeddings-index/embeddings.index --env ${ksEnvName}
ks show ${ksEnvName} -c "${component}"
ks apply ${ksEnvName} -c "${component}"
JOB_NAME="pipeline-create-search-index-${workflowId}"

View File

@ -0,0 +1,53 @@
#!/bin/bash
# This script is a wrapper script for calling submit code embedding job ksonnet component
# and creates a kubernetes job to submit dataflow job to compute code function embedding
# For more details about search index ksonnet component, check
# https://github.com/kubeflow/examples/blob/master/code_search/kubeflow/components/submit-code-embeddings-job.libsonnet
set -ex
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)"
# Providing negative value to kubeflow wait would wait for a week
timeout="-1s"
# Ksonnet Environment name. Always use pipeline
ksEnvName="pipeline"
# submit code embeddings job ksonnet component name
component="submit-code-embeddings-job"
# default number of dataflow workers
numWorkers=5
# default dataflow worker machine type
workerMachineType=n1-highcpu-32
usage() {
echo "Usage: submit_code_embeddings_job.sh --workflowId=<workflow id invoking the container> --modelDir=<directory contains the model>
--dataDir=<data dir> --numWorkers=<num of workers> --project=<project> --targetDataset=<target BQ dataset>
--workerMachineType=<worker machine type> --workingDir=<working dir> --cluster=<cluster to deploy job to>"
}
# List of required parameters
names=(dataDir modelDir targetDataset workingDir workflowId cluster)
source "${DIR}/parse_arguments.sh"
source "${DIR}/initialize_kubectl.sh"
# Apply parameters
ks param set ${component} jobNameSuffix ${workflowId} --env ${ksEnvName}
ks param set ${component} dataDir ${dataDir} --env ${ksEnvName}
ks param set ${component} modelDir ${modelDir} --env ${ksEnvName}
ks param set ${component} project ${project} --env ${ksEnvName}
ks param set ${component} targetDataset ${targetDataset} --env ${ksEnvName}
ks param set ${component} workingDir ${workingDir} --env ${ksEnvName}
ks param set ${component} numWorkers ${numWorkers} --env ${ksEnvName}
ks param set ${component} workerMachineType ${workerMachineType} --env ${ksEnvName}
ks show ${ksEnvName} -c "${component}"
ks apply ${ksEnvName} -c "${component}"
JOB_NAME="pipeline-embed-code-${workflowId}"
echo "wait for ${JOB_NAME} to finish"
kubectl wait --timeout="${timeout}" --for=condition=complete job/${JOB_NAME} -n "${namespace}"
# If the wait above failed, then the script will fail fast and following command won't run.
# TODO complete doesn't mean it's successful. Check the job succeeded.
echo "${JOB_NAME} is succeeded"

View File

@ -20,13 +20,6 @@
},
"pipeline": {
name: "pipeline",
project: "code-search-demo",
problem: "kf_github_function_docstring",
// modelBasePath shouldn't have integer in it.
modelBasePath: "gs://code-search-demo/models/20181107-dist-sync-gpu/export/",
lookupFile: "null",
indexFile: "null",
},
}

View File

@ -15,7 +15,8 @@
// are not picked up by the individual components.
// Need to see if we can find a way to fix this.
local imageTag = "v20181117-3c030ae-dirty-4d809c",
local imageTag = "v20181127-08f8c05-dirty-d9f034",
"t2t-job": {
jobType: "trainer",
numChief: 0,
@ -82,6 +83,17 @@
jobNameSuffix: "null",
image: $.components["t2t-job"].dataflowImage,
dataDir: $.components["t2t-code-search"].workingDir + "/data",
lookupFile: "null",
indexFile: "null",
},
"search-index-server": {
// Most defaults should be defined in experiments.libsonnet.
// Parameters will be used to override those values.
name: "search-index-server",
servingUrl: "http://t2t-code-search.kubeflow:8500/v1/models/t2t-code-search:predict",
// 1 replica is convenient for debugging but we should bump after debugging.
replicas: 1,
image: "gcr.io/kubeflow-examples/code-search-ui:v20181122-dc0e646-dirty-043a63",
},
"submit-preprocess-job": {
name: "submit-preprocess-job",
@ -106,9 +118,11 @@
// Directory where the model is stored.
modelDir: "",
jobName: "submit-code-embeddings-job",
jobNameSuffix: "",
workerMachineType: "n1-highcpu-32",
numWorkers: 5,
project: "",
waitUntilFinish: "false",
},
tensorboard: {

View File

@ -5,7 +5,7 @@
apiVersion: "batch/v1",
kind: "Job",
metadata: {
name: params.name,
name: params.name + '-' + params.jobNameSuffix,
namespace: env.namespace,
labels: {
app: params.name,
@ -35,13 +35,16 @@
"--target_dataset=" + params.targetDataset,
"--data_dir=" + params.dataDir,
"--problem=" + params.problem,
"--job_name=" + params.jobName,
"--job_name=" + params.jobName + '-' + params.jobNameSuffix,
"--saved_model_dir=" + params.modelDir,
"--temp_location=" + params.workingDir + "/dataflow/temp",
"--staging_location=" + params.workingDir + "/dataflow/staging",
"--worker_machine_type=" + params.workerMachineType,
"--num_workers=" + params.numWorkers,
"--requirements_file=requirements.dataflow.txt",
if (params.waitUntilFinish == "true") then
"--wait_until_finished"
else [],
],
env: [
{

View File

@ -2,8 +2,6 @@
// Warning: Do not define a global "image" as that will end up overriding
// the image parameter for all components. Define more specific names
// e.g. "dataflowImage", "trainerCpuImage", "trainerGpuImage",
workingDir: "gs://code-search-demo/20181104",
dataDir: "gs://code-search-demo/20181104/data",
project: "code-search-demo",
experiment: "pipeline",
waitUntilFinish: "true",
}

View File

@ -2,8 +2,10 @@ from typing import Dict
from kubernetes import client as k8s_client
import kfp.dsl as dsl
# disable max arg lint check
#pylint: disable=R0913
# pylint: disable=R0913
def default_gcp_op(name: str, image: str, command: str = None,
arguments: str = None, file_inputs: Dict[dsl.PipelineParam, str] = None,
@ -57,33 +59,23 @@ def default_gcp_op(name: str, image: str, command: str = None,
)
def dataflow_function_embedding_op(
project: 'GcpProject', runner: str, target_dataset: str, problem: str,
data_dir: 'GcsUri', saved_model_dir: 'GcsUri', temp_location: 'GcsUri',
staging_location: 'GcsUri',
job_name: str, worker_machine_type: str,
num_workers: int, step_name='dataflow_function_embedding'):
project: 'GcpProject', cluster_name: str, target_dataset: str, data_dir: 'GcsUri',
saved_model_dir: 'GcsUri', workflow_id: str, worker_machine_type: str,
num_workers: int, working_dir: str, step_name='dataflow_function_embedding'):
return default_gcp_op(
name=step_name,
image='gcr.io/kubeflow-examples/code-search-dataflow:latest',
command=[
'python2',
'-m',
'code_search.dataflow.cli.create_function_embeddings',
],
image='gcr.io/kubeflow-examples/code-search-ks:v20181127-08f8c05-dirty-19ca4c',
command=['/usr/local/src/submit_code_embeddings_job.sh'],
arguments=[
'--project', project,
'--runner', runner,
'--target_dataset', target_dataset,
'--problem', problem,
'--data_dir', data_dir,
'--saved_model_dir', saved_model_dir,
'--job_name', job_name,
'--temp_location', temp_location,
'--staging_location', staging_location,
'--worker_machine_type', worker_machine_type,
'--num_workers', num_workers,
'--requirements_file', 'requirements.dataflow.txt',
'--wait_until_finished',
"--workflowId=%s" % workflow_id,
"--modelDir=%s" % saved_model_dir,
"--dataDir=%s" % data_dir,
"--numWorkers=%s" % num_workers,
"--project=%s" % project,
"--targetDataset=%s" % target_dataset,
"--workerMachineType=%s" % worker_machine_type,
"--workingDir=%s" % working_dir,
'--cluster=%s' % cluster_name,
]
)
@ -93,7 +85,7 @@ def search_index_creator_op(
return dsl.ContainerOp(
# use component name as step name
name='search_index_creator',
image='gcr.io/kubeflow-examples/code-search-ks:v20181126-e62ebca-dirty-4103da',
image='gcr.io/kubeflow-examples/code-search-ks:v20181127-08f8c05-dirty-19ca4c',
command=['/usr/local/src/launch_search_index_creator_job.sh'],
arguments=[
'--workingDir=%s' % working_dir,
@ -113,27 +105,21 @@ def search_index_creator_op(
def function_embedding_update(
project,
working_dir,
data_dir,
saved_model_dir,
cluster_name,
namespace,
problem=dsl.PipelineParam(
name='problem', value='kf_github_function_docstring'),
runner=dsl.PipelineParam(name='runnder', value='DataflowRunner'),
target_dataset=dsl.PipelineParam(
name='target-dataset', value='code_search'),
worker_machine_type=dsl.PipelineParam(
name='worker-machine-type', value='n1-highcpu-32'),
target_dataset=dsl.PipelineParam(name='target-dataset', value='code_search'),
worker_machine_type=dsl.PipelineParam(name='worker-machine-type', value='n1-highcpu-32'),
num_workers=dsl.PipelineParam(name='num-workers', value=5)):
workflow_name = '{{workflow.name}}'
temp_location = '%s/dataflow/%s/temp' % (working_dir, workflow_name)
staging_location = '%s/dataflow/%s/staging' % (working_dir, workflow_name)
working_dir = '%s/%s' % (working_dir, workflow_name)
data_dir = '%s/data' % working_dir
function_embedding = dataflow_function_embedding_op(
project, runner, target_dataset, problem, data_dir,
saved_model_dir, temp_location, staging_location, workflow_name,
worker_machine_type, num_workers)
project, cluster_name, target_dataset, data_dir,
saved_model_dir,
workflow_name, worker_machine_type, num_workers, working_dir)
search_index_creator_op(
working_dir, data_dir, workflow_name, cluster_name, namespace).after(function_embedding)
working_dir, data_dir, workflow_name, cluster_name, namespace).after(function_embedding)
if __name__ == '__main__':

View File

@ -29,7 +29,7 @@ def add_parser_arguments(parser):
help='BigQuery dataset for output results')
additional_args_parser.add_argument('--pre_transformed', action='store_true',
help='Use a pre-transformed BigQuery dataset')
additional_args_parser.add_argument('--wait_until_finish', action='store_true',
additional_args_parser.add_argument('--wait_until_finished', action='store_true',
help='Wait until preprocess job is finished')
additional_args_parser.add_argument('--github_files', default='',

View File

@ -49,7 +49,7 @@ def create_function_embeddings(argv=None):
result = pipeline.run()
logging.info("Submitted Dataflow job: %s", result)
if args.wait_until_finish:
if args.wait_until_finished:
result.wait_until_finish()

View File

@ -7,5 +7,4 @@ nltk~=3.3.0
oauth2client~=4.1.0
spacy~=2.0.0
tensor2tensor~=1.9.0
tensorflow~=1.11.0
pybind11~=2.2.4