Add update search index pipeline (#361)

* add search index creator container

* add pipeline

* update op name

* update readme

* update scripts

* typo fix

* Update Makefile

* Update Makefile

* address comments

* fix ks

* update pipeline

* restructure the images

* remove echo

* update image

* format

* format

* address comments
This commit is contained in:
IronPan 2018-11-27 04:43:55 -08:00 committed by k8s-ci-robot
parent 15007fdeea
commit 31390d39a0
12 changed files with 285 additions and 65 deletions

View File

@ -72,15 +72,16 @@ build-ui-gcb:
gcloud builds submit --machine-type=n1-highcpu-32 --project=kubeflow-ci --config=./build/build.ui.json \
--timeout=3600 ./build
build-index-updater-gcb:
build-ks-gcb:
mkdir -p build
jsonnet ./docker/index_updater/build.jsonnet --ext-str gitVersion=$(GIT_VERSION) --ext-str tag=$(TAG) \
> ./build/build.index_updater.json
jsonnet ./docker/ks/build.jsonnet --ext-str gitVersion=$(GIT_VERSION) --ext-str tag=$(TAG) \
> ./build/build.ks.json
cp -r ./docker ./build/
cp -r ./kubeflow ./build/
cp -r ./src ./build/
rm -rf ./build/src/code_search/dataflow/cli/test_data
rm -rf ./build/src/code_search/t2t/test_data
gcloud builds submit --machine-type=n1-highcpu-32 --project=kubeflow-ci --config=./build/build.index_updater.json \
gcloud builds submit --machine-type=n1-highcpu-32 --project=kubeflow-ci --config=./build/build.ks.json \
--timeout=3600 ./build
# Build but don't attach the latest tag. This allows manual testing/inspection of the image

View File

@ -1,9 +0,0 @@
FROM ubuntu:xenial
RUN apt-get update && apt-get install -y wget &&\
rm -rf /var/lib/apt/lists/*
RUN wget -O /tmp/hub-linux-amd64-2.6.0.tgz https://github.com/github/hub/releases/download/v2.6.0/hub-linux-amd64-2.6.0.tgz && \
cd /usr/local && \
tar -xvf /tmp/hub-linux-amd64-2.6.0.tgz && \
ln -sf /usr/local/hub-linux-amd64-2.6.0/bin/hub /usr/local/bin/hub

View File

@ -1,3 +0,0 @@
# Index Updater
A Docker image and script suitable for updating the index served.

View File

@ -0,0 +1,23 @@
FROM ubuntu:xenial
RUN apt-get update && apt-get install -y wget &&\
rm -rf /var/lib/apt/lists/*
RUN wget -O /tmp/hub-linux-amd64-2.6.0.tgz https://github.com/github/hub/releases/download/v2.6.0/hub-linux-amd64-2.6.0.tgz && \
cd /usr/local && \
tar -xvf /tmp/hub-linux-amd64-2.6.0.tgz && \
ln -sf /usr/local/hub-linux-amd64-2.6.0/bin/hub /usr/local/bin/hub
RUN wget -O /opt/ks_0.12.0_linux_amd64.tar.gz \
https://github.com/ksonnet/ksonnet/releases/download/v0.12.0/ks_0.12.0_linux_amd64.tar.gz && \
tar -C /opt -xzf /opt/ks_0.12.0_linux_amd64.tar.gz && \
cp /opt/ks_0.12.0_linux_amd64/ks /bin/. && \
rm -f /opt/ks_0.12.0_linux_amd64.tar.gz && \
wget -O /bin/kubectl \
https://storage.googleapis.com/kubernetes-release/release/v1.11.2/bin/linux/amd64/kubectl && \
chmod u+x /bin/kubectl
ADD kubeflow /usr/local/src
ADD docker/ks/*.sh /usr/local/src/
WORKDIR /usr/local/src

View File

@ -7,20 +7,20 @@
{
"id": "build",
"name": "gcr.io/cloud-builders/docker",
"args": ["build", "-t", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"--label=git-versions=" + std.extVar("gitVersion"),
"--file=docker/index_updater/Dockerfile",
"args": ["build", "-t", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"--label=git-versions=" + std.extVar("gitVersion"),
"--file=docker/index_updater/Dockerfile",
"."],
},
{
"id": "tag",
"name": "gcr.io/cloud-builders/docker",
"args": ["tag", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"args": ["tag", "gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/index_updater:latest",],
"waitFor": ["build"],
},
},
],
"images": ["gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/index_updater:latest",
"images": ["gcr.io/kubeflow-examples/code-search/index_updater:" + std.extVar("tag"),
"gcr.io/kubeflow-examples/code-search/index_updater:latest",
],
}

View File

@ -0,0 +1,49 @@
#!/bin/bash
# This script is a wrapper script for calling search index creator ksonnet component
# and creates a kubernetes job to compute search index.
# For more details about search index ksonnet component, check
# https://github.com/kubeflow/examples/blob/master/code_search/kubeflow/components/search-index-creator.jsonnet
set -ex
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)"
# Providing negative value to kubeflow wait would wait for a week
timeout="-1s"
# Ksonnet Environment name. Always use pipeline
ksEnvName="pipeline"
# Search index creator ksonnet component name
component="search-index-creator"
usage() {
echo "Usage: launch_search_index_creator_job.sh --workingDir=<working dir> --workflowId=<workflow id invoking the container>
--dataDir=<data dir> --timeout=<timeout> --namespace=<kubernetes namespace> --cluster=<cluster to deploy job to> "
}
# List of required parameters
names=(workingDir workflowId dataDir namespace cluster)
source "${DIR}/parse_arguments.sh"
# Configure kubectl to use the underlying cluster
kubectl config set-cluster "${cluster}" --server=https://kubernetes.default --certificate-authority=/var/run/secrets/kubernetes.io/serviceaccount/ca.crt
kubectl config set-credentials pipeline --token "$(cat /var/run/secrets/kubernetes.io/serviceaccount/token)"
kubectl config set-context kubeflow --cluster "${cluster}" --user pipeline
kubectl config use-context kubeflow
ks env set "${ksEnvName}" --namespace="${namespace}"
# Apply parameters
ks param set ${component} dataDir ${dataDir} --env ${ksEnvName}
ks param set ${component} jobNameSuffix ${workflowId} --env ${ksEnvName}
ks param set ${component} lookupFile ${workingDir}/code-embeddings-index/${workflowId}/embedding-to-info.csv --env ${ksEnvName}
ks param set ${component} indexFile ${workingDir}/code-embeddings-index/${workflowId}/embeddings.index --env ${ksEnvName}
ks apply ${ksEnvName} -c "${component}"
JOB_NAME="pipeline-create-search-index-${workflowId}"
echo "wait for ${JOB_NAME} to finish"
kubectl wait --timeout="${timeout}" --for=condition=complete job/${JOB_NAME} -n "${namespace}"
# If the wait above failed, then the script will fail fast and following command won't run.
# TODO complete doesn't mean it's successful. Check the job succeeded.
echo "${JOB_NAME} is succeeded"

View File

@ -0,0 +1,39 @@
#!/bin/bash
# Common logic to parse the argument
# To use it, define the usage() function for help message and name array variables for required arguments in the
# parent script
parseArgs() {
# Parse all command line options
while [[ $# -gt 0 ]]; do
# Parameters should be of the form
# --{name}=${value}
echo parsing "$1"
if [[ $1 =~ ^--(.*)=(.*)$ ]]; then
name=${BASH_REMATCH[1]}
value=${BASH_REMATCH[2]}
eval ${name}="${value}"
elif [[ $1 =~ ^--(.*)$ ]]; then
name=${BASH_REMATCH[1]}
value=true
eval ${name}="${value}"
else
echo "Argument $1 did not match the pattern --{name}={value} or --{name}"
fi
shift
done
}
parseArgs $*
missingParam=false
for i in ${names[@]}; do
if [ -z ${!i} ]; then
echo "--${i} not set"
missingParam=true
fi
done
if ${missingParam}; then
usage
exit 1
fi

View File

@ -14,33 +14,14 @@ set -ex
DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" > /dev/null && pwd)"
parseArgs() {
# Parse all command line options
while [[ $# -gt 0 ]]; do
# Parameters should be of the form
# --{name}=${value}
echo parsing "$1"
if [[ $1 =~ ^--(.*)=(.*)$ ]]; then
name=${BASH_REMATCH[1]}
value=${BASH_REMATCH[2]}
eval ${name}="${value}"
elif [[ $1 =~ ^--(.*)$ ]]; then
name=${BASH_REMATCH[1]}
value=true
eval ${name}="${value}"
else
echo "Argument $1 did not match the pattern --{name}={value} or --{name}"
fi
shift
done
}
usage() {
echo "Usage: update_index.sh --base=OWNER:branch --appDir=<ksonnet app dir> --env=<ksonnet environment> --indexFile=<index file> --lookupFile=<lookup file>"
}
parseArgs $*
# List of required parameters
names=(appDir env lookupFile indexFile base)
source "${DIR}/parse_arguments.sh"
if [ ! -z ${help} ]; then
usage
@ -50,22 +31,6 @@ if [ -z ${dryrun} ]; then
dryrun=false
fi
# List of required parameters
names=(appDir env lookupFile indexFile base)
missingParam=false
for i in ${names[@]}; do
if [ -z ${!i} ]; then
echo "--${i} not set"
missingParam=true
fi
done
if ${missingParam}; then
usage
exit 1
fi
cd ${appDir}
ks param set --env=${env} search-index-server indexFile ${indexFile}
ks param set --env=${env} search-index-server lookupFile ${lookupFile}

View File

@ -25,5 +25,8 @@
problem: "kf_github_function_docstring",
// modelBasePath shouldn't have integer in it.
modelBasePath: "gs://code-search-demo/models/20181107-dist-sync-gpu/export/",
lookupFile: "null",
indexFile: "null",
},
}

View File

@ -79,11 +79,9 @@
},
"search-index-creator": {
name: "search-index-creator",
jobNameSuffix: "",
jobNameSuffix: "null",
image: $.components["t2t-job"].dataflowImage,
dataDir: $.components["t2t-code-search"].workingDir + "/data",
lookupFile: $.components["t2t-code-search"].workingDir + "/code_search_index.csv",
indexFile: $.components["t2t-code-search"].workingDir + "/code_search_index.nmslib",
},
"submit-preprocess-job": {
name: "submit-preprocess-job",

View File

@ -0,0 +1,12 @@
To run the pipeline, follow the kubeflow pipeline instruction and compile index_update_pipeline.py and upload to pipeline
page.
Provide the parameter, e.g.
```
PROJECT='code-search-demo'
CLUSTER_NAME='cs-demo-1103'
WORKING_DIR='gs://code-search-demo/pipeline'
SAVED_MODEL_DIR='gs://code-search-demo/models/20181107-dist-sync-gpu/export/1541712907/'
DATA_DIR='gs://code-search-demo/20181104/data'
```

View File

@ -0,0 +1,142 @@
from typing import Dict
from kubernetes import client as k8s_client
import kfp.dsl as dsl
# disable max arg lint check
#pylint: disable=R0913
def default_gcp_op(name: str, image: str, command: str = None,
arguments: str = None, file_inputs: Dict[dsl.PipelineParam, str] = None,
file_outputs: Dict[str, str] = None, is_exit_handler=False):
"""An operator that mounts the default GCP service account to the container.
The user-gcp-sa secret is created as part of the kubeflow deployment that
stores the access token for kubeflow user service account.
With this service account, the container has a range of GCP APIs to
access to. This service account is automatically created as part of the
kubeflow deployment.
For the list of the GCP APIs this service account can access to, check
https://github.com/kubeflow/kubeflow/blob/7b0db0d92d65c0746ac52b000cbc290dac7c62b1/deployment/gke/deployment_manager_configs/iam_bindings_template.yaml#L18
If you want to call the GCP APIs in a different project, grant the kf-user
service account access permission.
"""
return (
dsl.ContainerOp(
name,
image,
command,
arguments,
file_inputs,
file_outputs,
is_exit_handler,
)
.add_volume(
k8s_client.V1Volume(
name='gcp-credentials',
secret=k8s_client.V1SecretVolumeSource(
secret_name='user-gcp-sa'
)
)
)
.add_volume_mount(
k8s_client.V1VolumeMount(
mount_path='/secret/gcp-credentials',
name='gcp-credentials',
)
)
.add_env_variable(
k8s_client.V1EnvVar(
name='GOOGLE_APPLICATION_CREDENTIALS',
value='/secret/gcp-credentials/user-gcp-sa.json'
)
)
)
def dataflow_function_embedding_op(
project: 'GcpProject', runner: str, target_dataset: str, problem: str,
data_dir: 'GcsUri', saved_model_dir: 'GcsUri', temp_location: 'GcsUri',
staging_location: 'GcsUri',
job_name: str, worker_machine_type: str,
num_workers: int, step_name='dataflow_function_embedding'):
return default_gcp_op(
name=step_name,
image='gcr.io/kubeflow-examples/code-search-dataflow:latest',
command=[
'python2',
'-m',
'code_search.dataflow.cli.create_function_embeddings',
],
arguments=[
'--project', project,
'--runner', runner,
'--target_dataset', target_dataset,
'--problem', problem,
'--data_dir', data_dir,
'--saved_model_dir', saved_model_dir,
'--job_name', job_name,
'--temp_location', temp_location,
'--staging_location', staging_location,
'--worker_machine_type', worker_machine_type,
'--num_workers', num_workers,
'--requirements_file', 'requirements.dataflow.txt',
'--wait_until_finished',
]
)
def search_index_creator_op(
working_dir: str, data_dir: str, workflow_id: str, cluster_name: str, namespace: str):
return dsl.ContainerOp(
# use component name as step name
name='search_index_creator',
image='gcr.io/kubeflow-examples/code-search-ks:v20181126-e62ebca-dirty-4103da',
command=['/usr/local/src/launch_search_index_creator_job.sh'],
arguments=[
'--workingDir=%s' % working_dir,
'--dataDir=%s' % data_dir,
'--workflowId=%s' % workflow_id,
'--cluster=%s' % cluster_name,
'--namespace=%s' % namespace,
]
)
# The pipeline definition
@dsl.pipeline(
name='function_embedding',
description='Example function embedding pipeline'
)
def function_embedding_update(
project,
working_dir,
data_dir,
saved_model_dir,
cluster_name,
namespace,
problem=dsl.PipelineParam(
name='problem', value='kf_github_function_docstring'),
runner=dsl.PipelineParam(name='runnder', value='DataflowRunner'),
target_dataset=dsl.PipelineParam(
name='target-dataset', value='code_search'),
worker_machine_type=dsl.PipelineParam(
name='worker-machine-type', value='n1-highcpu-32'),
num_workers=dsl.PipelineParam(name='num-workers', value=5)):
workflow_name = '{{workflow.name}}'
temp_location = '%s/dataflow/%s/temp' % (working_dir, workflow_name)
staging_location = '%s/dataflow/%s/staging' % (working_dir, workflow_name)
function_embedding = dataflow_function_embedding_op(
project, runner, target_dataset, problem, data_dir,
saved_model_dir, temp_location, staging_location, workflow_name,
worker_machine_type, num_workers)
search_index_creator_op(
working_dir, data_dir, workflow_name, cluster_name, namespace).after(function_embedding)
if __name__ == '__main__':
import kfp.compiler as compiler
compiler.Compiler().compile(function_embedding_update, __file__ + '.tar.gz')