Integrate batch prediction (#184)

* Refactor the dataflow package

* Create placeholder for new prediction pipeline

* [WIP] add dofn for encoding

* Merge all modules under single package

* Pipeline data flow complete, wip prediction values

* Fallback to custom commands for extra dependency

* Working Dataflow runner installs, separate docker-related folder

* [WIP] Updated local user journey in README, fully working commands, easy container translation

* Working Batch Predictions.

* Remove docstring embeddings

* Complete batch prediction pipeline

* Update Dockerfiles and T2T Ksonnet components

* Fix linting

* Downgrade runtime to Python2, wip memory issues so use lesser data

* Pin master to index 0.

* Working batch prediction pipeline

* Modular Github Batch Prediction Pipeline, stores back to BigQuery

* Fix lint errors

* Fix module-wide imports, pin batch-prediction version

* Fix relative import, update docstrings

* Add references to issue and current workaround for Batch Prediction dependency.
This commit is contained in:
Sanyam Kapoor 2018-07-23 16:26:23 -07:00 committed by k8s-ci-robot
parent 38b3259dc1
commit 636cf1c3d0
44 changed files with 877 additions and 454 deletions

View File

@ -0,0 +1,4 @@
**/.git
**/.DS_Store
**/node_modules
**/kubeflow

View File

@ -3,16 +3,19 @@
This demo implements End-to-End Semantic Code Search on Kubeflow. It is based on the public This demo implements End-to-End Semantic Code Search on Kubeflow. It is based on the public
Github Dataset hosted on BigQuery. Github Dataset hosted on BigQuery.
## Prerequisites ## Setup
### Prerequisites
* Python 2.7 (with `pip`) * Python 2.7 (with `pip`)
* Python 3.6+ (with `pip3`)
* Python `virtualenv` * Python `virtualenv`
* Node
* Docker * Docker
* Ksonnet
**NOTE**: `Apache Beam` lacks `Python3` support and hence the multiple versions needed. **NOTE**: `Apache Beam` lacks `Python3` support and hence the version requirement.
## Google Cloud Setup ### Google Cloud Setup
* Install [`gcloud`](https://cloud.google.com/sdk/gcloud/) CLI * Install [`gcloud`](https://cloud.google.com/sdk/gcloud/) CLI
@ -28,96 +31,184 @@ $ gcloud services enable dataflow.googleapis.com
* Create a Google Cloud Project and Google Storage Bucket. * Create a Google Cloud Project and Google Storage Bucket.
* Authenticate with Google Container Registry to push Docker images
```
$ gcloud auth configure-docker
```
See [Google Cloud Docs](https://cloud.google.com/docs/) for more. See [Google Cloud Docs](https://cloud.google.com/docs/) for more.
## Python Environment Setup ### Create Kubernetes Secrets
This is needed for deployed pods in the Kubernetes cluster to access Google Cloud resources.
```
$ PROJECT=my-project ./create_secrets.sh
```
**NOTE**: Use `create_secrets.sh -d` to remove any side-effects of the above step.
### Python Environment Setup
This demo needs multiple Python versions and `virtualenv` is an easy way to This demo needs multiple Python versions and `virtualenv` is an easy way to
create isolated environments. create isolated environments.
``` ```
$ virtualenv -p $(which python2) venv2 && virtualenv -p $(which python3) venv3 $ virtualenv -p $(which python2) env2.7
``` ```
This creates two environments, `venv2` and `venv3` for `Python2` and `Python3` respectively. This creates a `env2.7` environment folder.
To use either of the environments, To use the environment,
``` ```
$ source venv2/bin/activate | source venv3/bin/activate # Pick one $ source env2.7/bin/activate
``` ```
See [Virtualenv Docs](https://virtualenv.pypa.io/en/stable/) for more. See [Virtualenv Docs](https://virtualenv.pypa.io/en/stable/) for more.
**NOTE**: The `env2.7` environment must be activated for all steps now onwards.
### Python Dependencies
To install dependencies, run the following commands
```
(env2.7) $ pip install https://github.com/kubeflow/batch-predict/tarball/master
(env2.7) $ pip install src/
```
This will install everything needed to run the demo code.
### Node Dependencies
```
$ pushd ui && npm i && popd
```
### Build and Push Docker Images
The `docker` directory contains Dockerfiles for each target application with its own `build.sh`. This is needed
to run the training jobs in Kubeflow cluster.
To build the Docker image for training jobs
```
$ ./docker/t2t/build.sh
```
To build the Docker image for Code Search UI
```
$ ./docker/ui/build.sh
```
Optionally, to push these images to GCR, one must export the `PROJECT=<my project name>` environment variable
and use the appropriate build script.
See [GCR Pushing and Pulling Images](https://cloud.google.com/container-registry/docs/pushing-and-pulling) for more.
# Pipeline # Pipeline
## 1. Data Pre-processing ## 1. Data Pre-processing
This step takes in the public Github dataset and generates function and docstring token pairs. This step takes in the public Github dataset and generates function and docstring token pairs.
Results are saved back into a BigQuery table. Results are saved back into a BigQuery table. It is done via a `Dataflow` job.
* Install dependencies
``` ```
(venv2) $ pip install -r preprocess/requirements.txt (env2.7) $ export GCS_DIR=gs://kubeflow-examples/t2t-code-search
``` (env2.7) $ code-search-preprocess -r DataflowRunner -o code_search:function_docstrings \
-p kubeflow-dev -j process-github-archive --storage-bucket ${GCS_DIR} \
* Execute the `Dataflow` job --machine-type n1-highcpu-32 --num-workers 16 --max-num-workers 16
```
$ python preprocess/scripts/process_github_archive.py -p kubeflow-dev -j process-github-archive \
--storage-bucket gs://kubeflow-examples/t2t-code-search -o code_search:function_docstrings \
--machine-type n1-highcpu-32 --num-workers 16 --max-num-workers 16
``` ```
## 2. Model Training ## 2. Model Training
We use `tensor2tensor` to train our model.
```
(env2.7) $ t2t-trainer --generate_data --problem=github_function_docstring --model=similarity_transformer --hparams_set=transformer_tiny \
--data_dir=${GCS_DIR}/data --output_dir=${GCS_DIR}/output \
--train_steps=100 --eval_steps=10 \
--t2t_usr_dir=src/code_search/t2t
```
A `Dockerfile` based on Tensorflow is provided along which has all the dependencies for this part of the pipeline. A `Dockerfile` based on Tensorflow is provided along which has all the dependencies for this part of the pipeline.
By default, it is based off Tensorflow CPU 1.8.0 for `Python3` but can be overridden in the Docker image build. By default, it is based off Tensorflow CPU 1.8.0 for `Python3` but can be overridden in the Docker image build.
This script builds and pushes the docker image to Google Container Registry. This script builds and pushes the docker image to Google Container Registry.
### 2.1 Build & Push images to GCR ## 3. Model Export
**NOTE**: The images can be pushed to any registry of choice but rest of the We use `t2t-exporter` to export our trained model above into the TensorFlow `SavedModel` format.
* Authenticate with GCR
``` ```
$ gcloud auth configure-docker (env2.7) $ t2t-exporter --problem=github_function_docstring --model=similarity_transformer --hparams_set=transformer_tiny \
--data_dir=${GCS_DIR}/data --output_dir=${GCS_DIR}/output \
--t2t_usr_dir=src/code_search/t2t
``` ```
* Build and push the image ## 4. Batch Prediction for Code Embeddings
We run another `Dataflow` pipeline to use the exported model above and get a high-dimensional embedding of each of
our code example. Specify the model version (which is a UNIX timestamp) from the output directory. This should be the name of
a folder at path `${GCS_DIR}/output/export/Servo`
``` ```
$ PROJECT=my-project ./build_image.sh (env2.7) $ export MODEL_VERSION=<put_unix_timestamp_here>
```
and a GPU image
```
$ GPU=1 PROJECT=my-project ./build_image.sh
``` ```
See [GCR Pushing and Pulling Images](https://cloud.google.com/container-registry/docs/pushing-and-pulling) for more. Now, start the job,
### 2.2 Train Locally
**WARNING**: The container might run out of memory and be killed.
#### 2.2.1 Function Summarizer
* Train transduction model using `Tranformer Networks` and a base hyper-parameters set
``` ```
$ export MOUNT_DATA_DIR=/path/to/data/folder (env2.7) $ export SAVED_MODEL_DIR=${GCS_DIR}/output/export/Servo/${MODEL_VERSION}
$ export MOUNT_OUTPUT_DIR=/path/to/output/folder (env2.7) $ code-search-predict -r DataflowRunner --problem=github_function_docstring -i "${GCS_DIR}/data/*.csv" \
$ docker run --rm -it -v ${MOUNT_DATA_DIR}:/data -v ${MOUNT_OUTPUT_DIR}:/output ${BUILD_IMAGE_TAG} \ --data-dir "${GCS_DIR}/data" --saved-model-dir "${SAVED_MODEL_DIR}"
--generate_data --problem=github_function_docstring --data_dir=/data --output_dir=/output \ -p kubeflow-dev -j batch-predict-github-archive --storage-bucket ${GCS_DIR} \
--model=similarity_transformer --hparams_set=transformer_tiny --machine-type n1-highcpu-32 --num-workers 16 --max-num-workers 16
``` ```
### 2.2 Train on Kubeflow ## 5. Create an NMSLib Index
* Setup secrets for access permissions Google Cloud Storage and Google Container Registry Using the above embeddings, we will now create an NMSLib index which will serve as our search index for
```shell new incoming queries.
$ PROJECT=my-project ./create_secrets.sh
```
(env2.7) $ export INDEX_FILE= # TODO(sanyamkapoor): Add the index file
(env2.7) $ nmslib-create --data-file=${EMBEDDINGS_FILE} --index-file=${INDEX_FILE}
``` ```
**NOTE**: Use `create_secrets.sh -d` to remove any side-effects of the above step.
## 6. Run a TensorFlow Serving container
This will start a TF Serving container using the model export above and export it at port 8501.
```
$ docker run --rm -p8501:8501 gcr.io/kubeflow-images-public/tensorflow-serving-1.8 tensorflow_model_server \
--rest_api_port=8501 --model_name=t2t_code_search --model_base_path=${GCS_DIR}/output/export/Servo
```
## 7. Serve the Search Engine
We will now serve the search engine via a simple REST interface
```
(env2.7) $ nmslib-serve --serving-url=http://localhost:8501/v1/models/t2t_code_search:predict \
--problem=github_function_docstring --data-dir=${GCS_DIR}/data --index-file=${INDEX_FILE}
```
## 8. Serve the UI
This will serve as the graphical wrapper on top of the REST search engine started in the previous step.
```
$ pushd ui && npm run build && popd
$ serve -s ui/build
```
# Pipeline on Kubeflow
TODO
# Acknowledgements # Acknowledgements

View File

@ -1,27 +0,0 @@
# NOTE: The context for this build must be the `app` directory
ARG BASE_IMAGE_TAG=1.8.0-py3
FROM tensorflow/tensorflow:$BASE_IMAGE_TAG
ADD . /app
WORKDIR /app
ENV T2T_USR_DIR=/app/code_search/t2t
RUN pip3 --no-cache-dir install . &&\
apt-get update && apt-get install -y jq &&\
rm -rf /var/lib/apt/lists/* &&\
ln -s /app/t2t-entrypoint.sh /usr/local/sbin/t2t-entrypoint
# TODO(sanyamkapoor): A workaround for tensorflow/tensor2tensor#879
RUN apt-get update && apt-get install -y curl python &&\
curl https://sdk.cloud.google.com | bash &&\
rm -rf /var/lib/apt/lists/*
VOLUME ["/data", "/output"]
EXPOSE 8008
ENTRYPOINT ["bash"]

View File

@ -1,2 +0,0 @@
from . import function_docstring
from . import similarity_transformer

View File

@ -1,23 +0,0 @@
from setuptools import setup, find_packages
with open('requirements.txt', 'r') as f:
install_requires = f.readlines()
VERSION = '0.0.1'
setup(name='code-search',
description='Kubeflow Code Search Demo',
url='https://www.github.com/kubeflow/examples',
author='Sanyam Kapoor',
author_email='sanyamkapoor@google.com',
version=VERSION,
license='MIT',
packages=find_packages(),
install_requires=install_requires,
extras_require={},
entry_points={
'console_scripts': [
'nmslib-serve=code_search.nmslib.cli:server',
'nmslib-create=code_search.nmslib.cli:creator',
]
})

View File

@ -1,34 +0,0 @@
#!/usr/bin/env bash
##
# This script builds and pushes a Docker image containing
# "app" to Google Container Registry. It automatically tags
# a unique image for every run.
#
set -ex
PROJECT=${PROJECT:-}
if [[ -z "${PROJECT}" ]]; then
echo "PROJECT environment variable missing!"
exit 1
fi
GPU=${GPU:-0}
BASE_IMAGE_TAG=$([[ "${GPU}" = "1" ]] && echo "1.8.0-gpu-py3" || echo "1.8.0-py3")
BUILD_IMAGE_TAG="code-search:v$(date +%Y%m%d)$([[ ${GPU} = "1" ]] && echo '-gpu' || echo '')-$(python3 -c 'import uuid; print(uuid.uuid4().hex[:7]);')"
# Directory of this script used as docker context
_SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
pushd "${_SCRIPT_DIR}/app"
docker build -t ${BUILD_IMAGE_TAG} --build-arg BASE_IMAGE_TAG=${BASE_IMAGE_TAG} .
# Push image to GCR PROJECT available
docker tag ${BUILD_IMAGE_TAG} gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
docker push gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
popd

View File

@ -0,0 +1,20 @@
ARG BASE_IMAGE_TAG=1.8.0
FROM tensorflow/tensorflow:$BASE_IMAGE_TAG
RUN pip --no-cache-dir install tensor2tensor~=1.6.0 oauth2client~=4.1.0 &&\
apt-get update && apt-get install -y jq &&\
rm -rf /var/lib/apt/lists/*
ADD src/code_search/t2t /app/code_search/t2t
ADD docker/t2t/t2t-entrypoint.sh /usr/local/sbin/t2t-entrypoint
WORKDIR /app
ENV PYTHONIOENCODING=utf-8 T2T_USR_DIR=/app/code_search/t2t
VOLUME ["/data", "/output"]
EXPOSE 8008
ENTRYPOINT ["bash"]

30
code_search/docker/t2t/build.sh Executable file
View File

@ -0,0 +1,30 @@
#!/usr/bin/env bash
##
# This script builds and pushes a Docker image containing
# the T2T problem to Google Container Registry. It automatically tags
# a unique image for every run.
#
set -ex
GPU=${GPU:-0}
BASE_IMAGE_TAG=$([[ "${GPU}" = "1" ]] && echo "1.8.0-gpu" || echo "1.8.0")
BUILD_IMAGE_UUID=$(python3 -c 'import uuid; print(uuid.uuid4().hex[:7]);')
BUILD_IMAGE_TAG="code-search:v$(date +%Y%m%d)$([[ ${GPU} = "1" ]] && echo '-gpu' || echo '')-${BUILD_IMAGE_UUID}"
# Directory of this script used for path references
_SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
pushd "${_SCRIPT_DIR}"
docker build -f "${_SCRIPT_DIR}/Dockerfile" -t ${BUILD_IMAGE_TAG} --build-arg BASE_IMAGE_TAG=${BASE_IMAGE_TAG} "${_SCRIPT_DIR}/../.."
# Push image to GCR PROJECT available
PROJECT=${PROJECT:-}
if [[ ! -z "${PROJECT}" ]]; then
docker tag ${BUILD_IMAGE_TAG} gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
docker push gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
fi
popd

View File

@ -16,7 +16,7 @@ TF_CONFIG=${TF_CONFIG:-}
if [[ ! -z "${TF_CONFIG}" ]]; then if [[ ! -z "${TF_CONFIG}" ]]; then
WORKER_ID=$(echo "${TF_CONFIG}" | jq ".task.index") WORKER_ID=$(echo "${TF_CONFIG}" | jq ".task.index")
WORKER_TYPE=$(echo "${TF_CONFIG}" | jq -r ".task.type") WORKER_TYPE=$(echo "${TF_CONFIG}" | jq -r ".task.type")
MASTER_INSTANCE=$(echo "${TF_CONFIG}" | jq -r ".cluster.${WORKER_TYPE}[${WORKER_ID}]") MASTER_INSTANCE=$(echo "${TF_CONFIG}" | jq -r ".cluster.master[0]")
if [[ "${TARGET_BIN}" = "t2t-trainer" ]]; then if [[ "${TARGET_BIN}" = "t2t-trainer" ]]; then
TARGET_BIN_OPTS="${TARGET_BIN_OPTS} --master=grpc://${MASTER_INSTANCE} --worker_id=${WORKER_ID}" TARGET_BIN_OPTS="${TARGET_BIN_OPTS} --master=grpc://${MASTER_INSTANCE} --worker_id=${WORKER_ID}"

View File

@ -0,0 +1,13 @@
FROM node:10.6
ADD ui/ /ui
WORKDIR /ui
RUN npm i && npm run build && npm i -g serve
EXPOSE 5000
ENTRYPOINT ["serve"]
CMD ["-l", "5000", "-n", "/ui/build"]

28
code_search/docker/ui/build.sh Executable file
View File

@ -0,0 +1,28 @@
#!/usr/bin/env bash
##
# This script builds and pushes a Docker image containing
# the Code Search UI to Google Container Registry. It automatically tags
# a unique image for every run.
#
set -ex
BUILD_IMAGE_UUID=$(python3 -c 'import uuid; print(uuid.uuid4().hex[:7]);')
BUILD_IMAGE_TAG="code-search-ui:v$(date +%Y%m%d)-${BUILD_IMAGE_UUID}"
# Directory of this script used for path references
_SCRIPT_DIR="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"
pushd "${_SCRIPT_DIR}"
docker build -f "${_SCRIPT_DIR}/Dockerfile" -t ${BUILD_IMAGE_TAG} "${_SCRIPT_DIR}/../.."
# Push image to GCR PROJECT available
PROJECT=${PROJECT:-}
if [[ ! -z "${PROJECT}" ]]; then
docker tag ${BUILD_IMAGE_TAG} gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
docker push gcr.io/${PROJECT}/${BUILD_IMAGE_TAG}
fi
popd

View File

@ -18,18 +18,52 @@
train_steps: 100, train_steps: 100,
eval_steps: 10, eval_steps: 10,
image: "gcr.io/kubeflow-dev/code-search:v20180621-266e689", image: "gcr.io/kubeflow-dev/code-search:v20180719-f04a4b7",
imageGpu: "gcr.io/kubeflow-dev/code-search:v20180621-gpu-db4f1ee", imageGpu: "gcr.io/kubeflow-dev/code-search:v20180719-gpu-9b8b4a8",
imagePullSecrets: [], imagePullSecrets: [],
dataDir: "null", dataDir: "null",
outputDir: "null", outputDir: "null",
model: "null", model: "null",
hparams_set: "null", hparams_set: "null",
},
// TODO(sanyamkapoor): A workaround for tensorflow/tensor2tensor#879 "t2t-code-search-trainer": {
gsDataDir: "null", jobType: "trainer",
gsOutputDir: "null", numWorker: 2,
numPs: 1,
// numWorkerGpu: 1,
// numPsGpu: 1,
name: "t2t-code-search-trainer",
problem: "github_function_docstring",
dataDir: "gs://kubeflow-examples/t2t-code-search/data",
outputDir: "gs://kubeflow-examples/t2t-code-search/output",
model: "similarity_transformer",
hparams_set: "transformer_tiny",
},
"t2t-code-search-exporter": {
jobType: "exporter",
name: "t2t-code-search-exporter",
problem: "github_function_docstring",
dataDir: "gs://kubeflow-examples/t2t-code-search/data",
outputDir: "gs://kubeflow-examples/t2t-code-search/output",
model: "similarity_transformer",
hparams_set: "transformer_tiny",
},
"t2t-code-search-serving": {
name: "t2t-code-search",
modelName: "t2t_code_search",
modelPath: "gs://kubeflow-examples/t2t-code-search/output/export/Servo",
modelServerImage: "gcr.io/kubeflow-images-public/tensorflow-serving-1.8:latest",
cloud: "gcp",
gcpCredentialSecretName: "gcp-credentials",
}, },
"nmslib": { "nmslib": {
@ -44,54 +78,6 @@
servingUrl: null, servingUrl: null,
}, },
"t2t-translate-datagen": {
jobType: "datagen",
name: "translate-ende-wmt32k-datagen",
problem: "translate_ende_wmt32k",
// TODO(sanyamkapoor): A workaround for tensorflow/tensor2tensor#879
dataDir: "/data",
outputDir: "/data",
gsOutputDir: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/datagen",
},
"t2t-translate-exporter": {
jobType: "exporter",
name: "translate-ende-wmt32k-exporter",
problem: "translate_ende_wmt32k",
dataDir: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/datagen",
outputDir: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/output",
model: "transformer",
hparams_set: "transformer_base_single_gpu",
},
"t2t-translate": {
jobType: "trainer",
numMaster: 1,
numWorker: 2,
numPs: 1,
numWorkerGpu: 1,
numPsGpu: 1,
name: "translate-ende-wmt32k",
problem: "translate_ende_wmt32k",
dataDir: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/datagen",
outputDir: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/output",
model: "transformer",
hparams_set: "transformer_base_single_gpu",
},
"t2t-translate-serving": {
name: "t2t-translate",
modelName: "t2t-translate",
modelPath: "gs://kubeflow-examples/t2t-translate/translate_ende_wmt32k/output/export/Servo",
modelServerImage: "gcr.io/kubeflow-images-public/tensorflow-serving-1.8:latest",
cloud: "gcp",
gcpCredentialSecretName: "gcp-credentials",
},
"nms-creator": { "nms-creator": {
name: "nms-creator", name: "nms-creator",
}, },

View File

@ -2,6 +2,6 @@ local k = import "k.libsonnet";
local t2tJob = import "t2t-job.libsonnet"; local t2tJob = import "t2t-job.libsonnet";
local env = std.extVar("__ksonnet/environments"); local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["t2t-translate-exporter"]; local params = std.extVar("__ksonnet/params").components["t2t-code-search-exporter"];
std.prune(k.core.v1.list.new([t2tJob.parts(params, env).job])) std.prune(k.core.v1.list.new([t2tJob.parts(params, env).job]))

View File

@ -1,5 +1,5 @@
local env = std.extVar("__ksonnet/environments"); local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["t2t-translate-serving"]; local params = std.extVar("__ksonnet/params").components["t2t-code-search-serving"];
local k = import "k.libsonnet"; local k = import "k.libsonnet";

View File

@ -2,6 +2,6 @@ local k = import "k.libsonnet";
local t2tJob = import "t2t-job.libsonnet"; local t2tJob = import "t2t-job.libsonnet";
local env = std.extVar("__ksonnet/environments"); local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["t2t-translate-datagen"]; local params = std.extVar("__ksonnet/params").components["t2t-code-search-trainer"];
std.prune(k.core.v1.list.new([t2tJob.parts(params, env).job])) std.prune(k.core.v1.list.new([t2tJob.parts(params, env).job]))

View File

@ -2,33 +2,6 @@ local tfJob = import "kubeflow/tf-job/tf-job.libsonnet";
local baseParams = std.extVar("__ksonnet/params").components["t2t-job"]; local baseParams = std.extVar("__ksonnet/params").components["t2t-job"];
{ {
getGcloudAuthCmd()::
[
"/root/google-cloud-sdk/bin/gcloud",
"auth",
"activate-service-account",
"--key-file",
"$GOOGLE_APPLICATION_CREDENTIALS",
],
getGsUtilCmd(src_dir, dst_dir)::
[
"/root/google-cloud-sdk/bin/gsutil",
"cp",
"-r",
src_dir,
dst_dir,
],
wrapGsUtil(cmd, params):: {
local resultCmd =
(if params.gsDataDir == "null" && params.gsOutputDir == "null" then [] else $.getGcloudAuthCmd() + ["&&"]) +
(if params.gsDataDir == "null" then [] else $.getGsUtilCmd(params.gsDataDir, params.dataDir) + ["&&"]) +
cmd +
(if params.gsOutputDir == "null" then [] else ["&&"] + $.getGsUtilCmd(params.outputDir, params.gsOutputDir)),
result: ["-c", std.join(" ", resultCmd)]
}.result,
getDatagenCmd(params):: getDatagenCmd(params)::
[ [
"t2t-datagen", "t2t-datagen",
@ -38,6 +11,7 @@ local baseParams = std.extVar("__ksonnet/params").components["t2t-job"];
getExporterCmd(params):: getExporterCmd(params)::
[ [
"/usr/local/sbin/t2t-entrypoint",
"t2t-exporter", "t2t-exporter",
"--problem=" + params.problem, "--problem=" + params.problem,
"--data_dir=" + params.dataDir, "--data_dir=" + params.dataDir,
@ -48,13 +22,14 @@ local baseParams = std.extVar("__ksonnet/params").components["t2t-job"];
getTrainerCmd(params):: { getTrainerCmd(params):: {
local trainer = [ local trainer = [
"/usr/local/sbin/t2t-entrypoint",
"t2t-trainer", "t2t-trainer",
"--generate_data", "--generate_data",
"--problem=" + params.problem, "--problem=" + params.problem,
"--data_dir=" + params.dataDir,
"--output_dir=" + params.outputDir,
"--model=" + params.model, "--model=" + params.model,
"--hparams_set=" + params.hparams_set, "--hparams_set=" + params.hparams_set,
"--data_dir=" + params.dataDir,
"--output_dir=" + params.outputDir,
"--train_steps=" + std.toString(params.train_steps), "--train_steps=" + std.toString(params.train_steps),
], ],
@ -139,38 +114,24 @@ local baseParams = std.extVar("__ksonnet/params").components["t2t-job"];
}, },
], ],
// TODO(sanyamkapoor): A workaround for tensorflow/tensor2tensor#879
// once fixed, simply get rid of $.wrapGsUtil method
local cmd = $.getTrainerCmd(params), local cmd = $.getTrainerCmd(params),
local finalCmd = {
master: $.wrapGsUtil(["/usr/local/sbin/t2t-entrypoint"] + cmd.master, params),
worker: $.wrapGsUtil(["/usr/local/sbin/t2t-entrypoint"] + cmd.worker, params),
ps: $.wrapGsUtil(["/usr/local/sbin/t2t-entrypoint"] + cmd.ps, params),
},
local datagenCmd = $.wrapGsUtil(["/usr/local/sbin/t2t-entrypoint"] + $.getDatagenCmd(params), params),
local exporterCmd = $.wrapGsUtil(["/usr/local/sbin/t2t-entrypoint"] + $.getExporterCmd(params), params),
job:: job::
tfJob.parts.tfJob( tfJob.parts.tfJob(
params.name, params.name,
env.namespace, env.namespace,
if params.jobType == "datagen" then if params.jobType == "exporter" then
[ [
$.tfJobReplica("MASTER", params.numMaster, datagenCmd, workerImage, params.numWorkerGpu, $.tfJobReplica("MASTER", params.numMaster, $.getExporterCmd(params), workerImage, params.numWorkerGpu,
workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts),
]
else if params.jobType == "exporter" then
[
$.tfJobReplica("MASTER", params.numMaster, exporterCmd, workerImage, params.numWorkerGpu,
workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts), workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts),
] ]
else else
[ [
$.tfJobReplica("MASTER", params.numMaster, finalCmd.master, workerImage, params.numWorkerGpu, $.tfJobReplica("MASTER", params.numMaster, cmd.master, workerImage, params.numWorkerGpu,
workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts), workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts),
$.tfJobReplica("WORKER", params.numWorker, finalCmd.worker, workerImage, params.numWorkerGpu, $.tfJobReplica("WORKER", params.numWorker, cmd.worker, workerImage, params.numWorkerGpu,
workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts), workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts),
$.tfJobReplica("PS", params.numPs, finalCmd.ps, workerImage, params.numPsGpu, $.tfJobReplica("PS", params.numPs, cmd.ps, workerImage, params.numPsGpu,
workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts), workerImagePullSecrets, workerEnv, workerVolumes, workerVolumeMounts),
], ],
terminationPolicy terminationPolicy

View File

@ -1,7 +0,0 @@
local k = import "k.libsonnet";
local t2tJob = import "t2t-job.libsonnet";
local env = std.extVar("__ksonnet/environments");
local params = std.extVar("__ksonnet/params").components["t2t-translate"];
std.prune(k.core.v1.list.new([t2tJob.parts(params, env).job]))

View File

@ -1,4 +0,0 @@
astor~=0.6.0
apache-beam[gcp]~=2.5.0
nltk~=3.3.0
spacy~=2.0.0

View File

@ -1,45 +0,0 @@
from __future__ import print_function
import argparse
import os
import apache_beam as beam
from preprocess.pipeline import create_pipeline_opts, ProcessGithubFiles
def parse_arguments(args):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
default_script_file = os.path.abspath('{}/../../files/select_github_archive.sql'.format(__file__))
parser.add_argument('-i', '--input', metavar='', type=str, default=default_script_file,
help='Path to BigQuery SQL script')
parser.add_argument('-o', '--output', metavar='', type=str,
help='Output string of the format <dataset>:<table>')
parser.add_argument('-p', '--project', metavar='', type=str, default='Project', help='Project ID')
parser.add_argument('-j', '--job-name', metavar='', type=str, default='Beam Job', help='Job name')
parser.add_argument('--storage-bucket', metavar='', type=str, default='gs://bucket',
help='Path to Google Storage Bucket')
parser.add_argument('--num-workers', metavar='', type=int, default=1, help='Number of workers')
parser.add_argument('--max-num-workers', metavar='', type=int, default=1,
help='Maximum number of workers')
parser.add_argument('--machine-type', metavar='', type=str, default='n1-standard-1',
help='Google Cloud Machine Type to use')
parsed_args = parser.parse_args(args)
return parsed_args
def main(args):
args = parse_arguments(args)
pipeline_opts = create_pipeline_opts(args)
with open(args.input, 'r') as f:
query_string = f.read()
pipeline = beam.Pipeline(options=pipeline_opts)
(pipeline | ProcessGithubFiles(args.project, query_string, args.output, args.storage_bucket)) #pylint: disable=expression-not-assigned
pipeline.run()
if __name__ == '__main__':
import sys
main(sys.argv[1:])

View File

@ -0,0 +1,120 @@
"""Entrypoint for Dataflow jobs"""
from __future__ import print_function
import argparse
import os
import apache_beam as beam
import apache_beam.options.pipeline_options as pipeline_options
import code_search.transforms.process_github_files as process_github_files
import code_search.transforms.code_embed as code_embed
def create_pipeline_opts(args):
"""Create standard Pipeline Options for Beam"""
options = pipeline_options.PipelineOptions()
options.view_as(pipeline_options.StandardOptions).runner = args.runner
google_cloud_options = options.view_as(pipeline_options.GoogleCloudOptions)
google_cloud_options.project = args.project
if args.runner == 'DataflowRunner':
google_cloud_options.job_name = args.job_name
google_cloud_options.temp_location = '{}/temp'.format(args.storage_bucket)
google_cloud_options.staging_location = '{}/staging'.format(args.storage_bucket)
worker_options = options.view_as(pipeline_options.WorkerOptions)
worker_options.num_workers = args.num_workers
worker_options.max_num_workers = args.max_num_workers
worker_options.machine_type = args.machine_type
setup_options = options.view_as(pipeline_options.SetupOptions)
setup_options.setup_file = os.path.join(os.path.dirname(os.path.dirname(__file__)), 'setup.py')
return options
def parse_arguments(argv):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-r', '--runner', metavar='', type=str, default='DirectRunner',
help='Type of runner - DirectRunner or DataflowRunner')
parser.add_argument('-i', '--input', metavar='', type=str, default='',
help='Path to input file')
parser.add_argument('-o', '--output', metavar='', type=str,
help='Output string of the format <dataset>:<table>')
predict_args_parser = parser.add_argument_group('Batch Prediction Arguments')
predict_args_parser.add_argument('--problem', metavar='', type=str,
help='Name of the T2T problem')
predict_args_parser.add_argument('--data-dir', metavar='', type=str,
help='aPath to directory of the T2T problem data')
predict_args_parser.add_argument('--saved-model-dir', metavar='', type=str,
help='Path to directory containing Tensorflow SavedModel')
# Dataflow related arguments
dataflow_args_parser = parser.add_argument_group('Dataflow Runner Arguments')
dataflow_args_parser.add_argument('-p', '--project', metavar='', type=str, default='Project',
help='Project ID')
dataflow_args_parser.add_argument('-j', '--job-name', metavar='', type=str, default='Beam Job',
help='Job name')
dataflow_args_parser.add_argument('--storage-bucket', metavar='', type=str, default='gs://bucket',
help='Path to Google Storage Bucket')
dataflow_args_parser.add_argument('--num-workers', metavar='', type=int, default=1,
help='Number of workers')
dataflow_args_parser.add_argument('--max-num-workers', metavar='', type=int, default=1,
help='Maximum number of workers')
dataflow_args_parser.add_argument('--machine-type', metavar='', type=str, default='n1-standard-1',
help='Google Cloud Machine Type to use')
parsed_args = parser.parse_args(argv)
return parsed_args
def create_github_pipeline(argv=None):
"""Creates the Github source code pre-processing pipeline.
This pipeline takes an SQL file for BigQuery as an input
and puts the results in a file and a new BigQuery table.
An SQL file is included with the module.
"""
args = parse_arguments(argv)
default_sql_file = os.path.abspath('{}/../../files/select_github_archive.sql'.format(__file__))
args.input = args.input or default_sql_file
pipeline_opts = create_pipeline_opts(args)
with open(args.input, 'r') as f:
query_string = f.read()
pipeline = beam.Pipeline(options=pipeline_opts)
(pipeline #pylint: disable=expression-not-assigned
| process_github_files.ProcessGithubFiles(args.project, query_string,
args.output, args.storage_bucket)
)
result = pipeline.run()
if args.runner == 'DirectRunner':
result.wait_until_finish()
def create_batch_predict_pipeline(argv=None):
"""Creates Batch Prediction Pipeline using trained model.
This pipeline takes in a collection of CSV files returned
by the Github Pipeline, embeds the code text using the
trained model in a given model directory.
"""
args = parse_arguments(argv)
pipeline_opts = create_pipeline_opts(args)
pipeline = beam.Pipeline(options=pipeline_opts)
(pipeline #pylint: disable=expression-not-assigned
| code_embed.GithubBatchPredict(args.project, args.problem,
args.data_dir, args.saved_model_dir)
)
result = pipeline.run()
if args.runner == 'DirectRunner':
result.wait_until_finish()
if __name__ == '__main__':
create_batch_predict_pipeline()

View File

@ -0,0 +1,3 @@
from code_search.do_fns.github_files import ExtractFuncInfo
from code_search.do_fns.github_files import TokenizeCodeDocstring
from code_search.do_fns.github_files import SplitRepoPath

View File

@ -0,0 +1,76 @@
"""Beam DoFns for prediction related tasks"""
import io
import csv
from cStringIO import StringIO
import apache_beam as beam
from code_search.transforms.process_github_files import ProcessGithubFiles
from code_search.t2t.query import get_encoder, encode_query
class GithubCSVToDict(beam.DoFn):
"""Split a text row and convert into a dict."""
def process(self, element): # pylint: disable=no-self-use
element = element.encode('utf-8')
row = StringIO(element)
reader = csv.reader(row, delimiter=',')
keys = ProcessGithubFiles.get_key_list()
values = next(reader) # pylint: disable=stop-iteration-return
result = dict(zip(keys, values))
yield result
class GithubDictToCSV(beam.DoFn):
"""Convert dictionary to writable CSV string."""
def process(self, element): # pylint: disable=no-self-use
element['function_embedding'] = ','.join(str(val) for val in element['function_embedding'])
target_keys = ['nwo', 'path', 'function_name', 'function_embedding']
target_values = [element[key].encode('utf-8') for key in target_keys]
with io.BytesIO() as fs:
cw = csv.writer(fs)
cw.writerow(target_values)
result_str = fs.getvalue().strip('\r\n')
return result_str
class EncodeExample(beam.DoFn):
"""Encode string to integer tokens.
This is needed so that the data can be sent in
for prediction
"""
def __init__(self, problem, data_dir):
super(EncodeExample, self).__init__()
self.problem = problem
self.data_dir = data_dir
def process(self, element):
encoder = get_encoder(self.problem, self.data_dir)
encoded_function = encode_query(encoder, element['function_tokens'])
element['instances'] = [{'input': {'b64': encoded_function}}]
yield element
class ProcessPrediction(beam.DoFn):
"""Process results from PredictionDoFn.
This class processes predictions from another
DoFn, to make sure it is a correctly formatted dict.
"""
def process(self, element): # pylint: disable=no-self-use
element['function_embedding'] = ','.join([
str(val) for val in element['predictions'][0]['outputs']
])
element.pop('function_tokens')
element.pop('instances')
element.pop('predictions')
yield element

View File

@ -0,0 +1,79 @@
"""Beam DoFns for Github related tasks"""
import time
import logging
import apache_beam as beam
from apache_beam import pvalue
from apache_beam.metrics import Metrics
class SplitRepoPath(beam.DoFn):
# pylint: disable=abstract-method
"""Split the space-delimited file `repo_path` into owner repository (`nwo`)
and file path (`path`)"""
def process(self, element): # pylint: disable=no-self-use
nwo, path = element.pop('repo_path').split(' ', 1)
element['nwo'] = nwo
element['path'] = path
yield element
class TokenizeCodeDocstring(beam.DoFn):
# pylint: disable=abstract-method
"""Compute code/docstring pairs from incoming BigQuery row dict"""
def __init__(self):
super(TokenizeCodeDocstring, self).__init__()
self.tokenization_time_ms = Metrics.counter(self.__class__, 'tokenization_time_ms')
def process(self, element): # pylint: disable=no-self-use
try:
import code_search.utils as utils
start_time = time.time()
element['pairs'] = utils.get_function_docstring_pairs(element.pop('content'))
self.tokenization_time_ms.inc(int((time.time() - start_time) * 1000.0))
yield element
except Exception as e: #pylint: disable=broad-except
logging.warning('Tokenization failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
class ExtractFuncInfo(beam.DoFn):
# pylint: disable=abstract-method
"""Convert pair tuples to dict.
This takes a list of values from `TokenizeCodeDocstring`
and converts into a dictionary so that values can be
indexed by names instead of indices. `info_keys` is the
list of names of those values in order which will become
the keys of each new dict.
"""
def __init__(self, info_keys):
super(ExtractFuncInfo, self).__init__()
self.info_keys = info_keys
def process(self, element):
try:
info_rows = [dict(zip(self.info_keys, pair)) for pair in element.pop('pairs')]
info_rows = [self.merge_two_dicts(info_dict, element) for info_dict in info_rows]
info_rows = map(self.dict_to_unicode, info_rows)
yield info_rows
except Exception as e: #pylint: disable=broad-except
logging.warning('Function Info extraction failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
@staticmethod
def merge_two_dicts(dict_a, dict_b):
result = dict_a.copy()
result.update(dict_b)
return result
@staticmethod
def dict_to_unicode(data_dict):
for k, v in data_dict.items():
if isinstance(v, str):
data_dict[k] = v.decode('utf-8', 'ignore')
return data_dict

View File

@ -1,16 +1,13 @@
import json import json
import requests import requests
import nmslib import nmslib
import numpy as np from code_search.t2t.query import get_encoder, encode_query
from tensor2tensor import problems # pylint: disable=unused-import
from code_search.t2t.query import get_encoder_decoder, encode_query
class CodeSearchEngine: class CodeSearchEngine:
"""This is a utility class which takes an nmslib """This is a utility class which takes an nmslib
index file and a data file to return data from""" index file and a data file to return data from"""
def __init__(self, problem: str, data_dir: str, serving_url: str, def __init__(self, problem, data_dir, serving_url, index_file):
index_file: str):
self._serving_url = serving_url self._serving_url = serving_url
self._problem = problem self._problem = problem
self._data_dir = data_dir self._data_dir = data_dir
@ -25,7 +22,7 @@ class CodeSearchEngine:
This involves encoding the input query This involves encoding the input query
for the TF Serving service for the TF Serving service
""" """
encoder, _ = get_encoder_decoder(self._problem, self._data_dir) encoder, _ = get_encoder(self._problem, self._data_dir)
encoded_query = encode_query(encoder, query_str) encoded_query = encode_query(encoder, query_str)
data = {"instances": [{"input": {"b64": encoded_query}}]} data = {"instances": [{"input": {"b64": encoded_query}}]}
@ -37,7 +34,7 @@ class CodeSearchEngine:
result['predictions'] = [preds['outputs'] for preds in result['predictions']] result['predictions'] = [preds['outputs'] for preds in result['predictions']]
return result return result
def query(self, query_str: str, k=2): def query(self, query_str, k=2):
embedding = self.embed(query_str) embedding = self.embed(query_str)
idxs, dists = self.index.knnQuery(embedding, k=k) idxs, dists = self.index.knnQuery(embedding, k=k)
@ -56,7 +53,7 @@ class CodeSearchEngine:
return index return index
@staticmethod @staticmethod
def create_index(data: np.array, save_path: str): def create_index(data, save_path):
"""Add numpy data to the index and save to path""" """Add numpy data to the index and save to path"""
index = CodeSearchEngine.nmslib_init() index = CodeSearchEngine.nmslib_init()
index.addDataPointBatch(data) index.addDataPointBatch(data)

View File

@ -1,11 +1,10 @@
from flask import Flask, request, abort, jsonify, make_response from flask import Flask, request, abort, jsonify, make_response
from code_search.nmslib.search_engine import CodeSearchEngine
class CodeSearchServer: class CodeSearchServer:
"""This utility class wraps the search engine into """This utility class wraps the search engine into
an HTTP server based on Flask""" an HTTP server based on Flask"""
def __init__(self, engine: CodeSearchEngine, host='0.0.0.0', port=8008): def __init__(self, engine, host='0.0.0.0', port=8008):
self.app = Flask(__name__) self.app = Flask(__name__)
self.host = host self.host = host
self.port = port self.port = port

View File

@ -0,0 +1,7 @@
##
# NOTE: Keeping these imports relative as this module
# is needed for independent usage outside the `code_search`
# top level module
#
from . import function_docstring
from . import similarity_transformer

View File

@ -1,5 +1,7 @@
"""Github function/text similatrity problems.""" """Github function/text similatrity problems."""
import csv import csv
import os
from cStringIO import StringIO
from tensor2tensor.data_generators import generator_utils from tensor2tensor.data_generators import generator_utils
from tensor2tensor.data_generators import translate from tensor2tensor.data_generators import translate
from tensor2tensor.utils import metrics from tensor2tensor.utils import metrics
@ -10,14 +12,12 @@ from tensor2tensor.utils import registry
# These URLs are only for fallback purposes in case the specified # These URLs are only for fallback purposes in case the specified
# `data_dir` does not contain the data. However, note that the data # `data_dir` does not contain the data. However, note that the data
# files must have the same naming pattern. # files must have the same naming pattern.
# TODO: The memory is exploding, need to fix this.
# #
_DATA_BASE_URL = 'https://storage.googleapis.com/kubeflow-examples/t2t-code-search/data' _DATA_BASE_URL = 'gs://kubeflow-examples/t2t-code-search/data'
_GITHUB_FUNCTION_DOCSTRING_FILES = [ _GITHUB_FUNCTION_DOCSTRING_FILES = [
[ 'pairs-0000{}-of-00010.csv'.format(i)
'{}/pairs-0000{}-of-00010.csv'.format(_DATA_BASE_URL, i), for i in range(1)
'pairs-0000{}-of-00010.csv'.format(i),
]
for i in range(10)
] ]
@ -41,18 +41,24 @@ class GithubFunctionDocstring(translate.TranslateProblem):
return _GITHUB_FUNCTION_DOCSTRING_FILES return _GITHUB_FUNCTION_DOCSTRING_FILES
def generate_samples(self, data_dir, tmp_dir, dataset_split): # pylint: disable=no-self-use,unused-argument def generate_samples(self, data_dir, tmp_dir, dataset_split): # pylint: disable=no-self-use,unused-argument
"""Returns a generator to return {"inputs": [text], "targets": [text]}.""" """Returns a generator to return {"inputs": [text], "targets": [text]}.
If the `data_dir` is a GCS path, all data is downloaded to the
`tmp_dir`.
"""
download_dir = tmp_dir if data_dir.startswith('gs://') else data_dir
uri_base = data_dir if data_dir.startswith('gs://') else _DATA_BASE_URL
pair_csv_files = [ pair_csv_files = [
generator_utils.maybe_download(data_dir, filename, uri) generator_utils.maybe_download(download_dir, filename, os.path.join(uri_base, filename))
for uri, filename in self.source_data_files(dataset_split) for filename in self.source_data_files(dataset_split)
] ]
for pairs_file in pair_csv_files: for pairs_file in pair_csv_files:
with open(pairs_file, 'r') as csv_file: with open(pairs_file, 'r') as csv_file:
pairs_reader = csv.reader(csv_file) for line in csv_file:
for row in pairs_reader: reader = csv.reader(StringIO(line), delimiter=',')
function_tokens, docstring_tokens = row[-2:] function_tokens, docstring_tokens = next(reader)[-2:] # pylint: disable=stop-iteration-return
yield {'inputs': docstring_tokens, 'targets': function_tokens} yield {'inputs': docstring_tokens, 'targets': function_tokens}
def eval_metrics(self): # pylint: disable=no-self-use def eval_metrics(self): # pylint: disable=no-self-use

View File

@ -4,15 +4,14 @@ from tensor2tensor.data_generators import text_encoder
from tensor2tensor.utils import registry from tensor2tensor.utils import registry
def get_encoder_decoder(problem_name, data_dir): def get_encoder(problem_name, data_dir):
"""Get encoder from the T2T problem.This might """Get encoder from the T2T problem.This might
vary by problem, keeping generic as a reference vary by problem, keeping generic as a reference
""" """
problem = registry.problem(problem_name) problem = registry.problem(problem_name)
hparams = tf.contrib.training.HParams(data_dir=data_dir) hparams = tf.contrib.training.HParams(data_dir=data_dir)
problem.get_hparams(hparams) problem.get_hparams(hparams)
return problem.feature_info["inputs"].encoder, \ return problem.feature_info["inputs"].encoder
problem.feature_info["targets"].encoder
def encode_query(encoder, query_str): def encode_query(encoder, query_str):

View File

@ -0,0 +1,86 @@
import apache_beam as beam
import apache_beam.io.gcp.internal.clients as clients
class BigQueryRead(beam.PTransform):
"""Wrapper over Apache Beam Big Query Read.
This is an abstract class and one must override
the `query_string` property to specify the query
string.
"""
def __init__(self, project):
super(BigQueryRead, self).__init__()
self.project = project
@property
def limit(self):
"""Limit for the query rows.
The case None should be handled if using
this property.
"""
return None
@property
def query_string(self):
raise NotImplementedError
def expand(self, input_or_inputs):
return (input_or_inputs
| beam.io.Read(beam.io.BigQuerySource(project=self.project,
query=self.query_string,
use_standard_sql=True))
)
class BigQueryWrite(beam.PTransform):
"""Wrapper over Apache Beam BigQuery Write.
This is an abstract class and one must override
the `column_list` property for valid write transform.
This property should return a list of tuples as
[
('column_name', 'column_type')
]
"""
def __init__(self, project, dataset, table, batch_size=500):
super(BigQueryWrite, self).__init__()
self.project = project
self.dataset = dataset
self.table = table
self.batch_size = batch_size
@property
def column_list(self):
raise NotImplementedError
@property
def output_schema(self):
return self.construct_schema(self.column_list)
def expand(self, input_or_inputs):
return (input_or_inputs
| beam.io.WriteToBigQuery(project=self.project,
dataset=self.dataset,
table=self.table,
schema=self.output_schema,
batch_size=self.batch_size)
)
@staticmethod
def construct_schema(column_list):
table_schema = clients.bigquery.TableSchema()
for column_name, column_type in column_list:
field_schema = clients.bigquery.TableFieldSchema()
field_schema.name = column_name
field_schema.type = column_type
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
return table_schema

View File

@ -0,0 +1,54 @@
import apache_beam as beam
import kubeflow_batch_predict.dataflow.batch_prediction as batch_prediction
import code_search.do_fns.embeddings as embeddings
import code_search.transforms.github_bigquery as github_bigquery
class GithubBatchPredict(beam.PTransform):
"""Batch Prediction for Github dataset"""
def __init__(self, project, problem, data_dir, saved_model_dir):
super(GithubBatchPredict, self).__init__()
self.project = project
self.problem = problem
self.data_dir = data_dir
self.saved_model_dir = saved_model_dir
##
# Target dataset and table to store prediction outputs.
# Non-configurable for now.
#
self.index_dataset = 'code_search'
self.index_table = 'search_index'
self.batch_size = 100
def expand(self, input_or_inputs):
rows = (input_or_inputs
| "Read Processed Github Dataset" >> github_bigquery.ReadProcessedGithubData(self.project)
)
batch_predict = (rows
| "Prepare Encoded Input" >> beam.ParDo(embeddings.EncodeExample(self.problem,
self.data_dir))
| "Execute Predictions" >> beam.ParDo(batch_prediction.PredictionDoFn(),
self.saved_model_dir).with_outputs("errors",
main="main")
)
predictions = batch_predict.main
formatted_predictions = (predictions
| "Process Predictions" >> beam.ParDo(embeddings.ProcessPrediction())
)
(formatted_predictions # pylint: disable=expression-not-assigned
| "Save Index Data" >> github_bigquery.WriteGithubIndexData(self.project,
self.index_dataset,
self.index_table,
batch_size=self.batch_size)
)
return formatted_predictions

View File

@ -0,0 +1,87 @@
import code_search.transforms.bigquery as bigquery
class ReadOriginalGithubPythonData(bigquery.BigQueryRead):
@property
def limit(self):
return None
@property
def query_string(self):
query = """
SELECT
MAX(CONCAT(f.repo_name, ' ', f.path)) AS repo_path,
c.content
FROM
`bigquery-public-data.github_repos.files` AS f
JOIN
`bigquery-public-data.github_repos.contents` AS c
ON
f.id = c.id
JOIN (
--this part of the query makes sure repo is watched at least twice since 2017
SELECT
repo
FROM (
SELECT
repo.name AS repo
FROM
`githubarchive.year.2017`
WHERE
type="WatchEvent"
UNION ALL
SELECT
repo.name AS repo
FROM
`githubarchive.month.2018*`
WHERE
type="WatchEvent" )
GROUP BY
1
HAVING
COUNT(*) >= 2 ) AS r
ON
f.repo_name = r.repo
WHERE
f.path LIKE '%.py' AND --with python extension
c.size < 15000 AND --get rid of ridiculously long files
REGEXP_CONTAINS(c.content, r'def ') --contains function definition
GROUP BY
c.content
"""
if self.limit:
query += '\nLIMIT {}'.format(self.limit)
return query
class ReadProcessedGithubData(bigquery.BigQueryRead):
@property
def limit(self):
return 100
@property
def query_string(self):
query = """
SELECT
nwo, path, function_name, lineno, original_function, function_tokens
FROM
code_search.function_docstrings
"""
if self.limit:
query += '\nLIMIT {}'.format(self.limit)
return query
class WriteGithubIndexData(bigquery.BigQueryWrite):
@property
def column_list(self):
return [
('nwo', 'STRING'),
('path', 'STRING'),
('function_name', 'STRING'),
('lineno', 'INTEGER'),
('original_function', 'STRING'),
('function_embedding', 'STRING')
]

View File

@ -1,102 +1,9 @@
import os
import logging
import time
import csv
import io import io
import csv
import apache_beam as beam import apache_beam as beam
from apache_beam import pvalue import apache_beam.io.gcp.internal.clients as clients
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions, \
GoogleCloudOptions, SetupOptions, WorkerOptions
from apache_beam.io.gcp.internal.clients import bigquery
import code_search.do_fns as do_fns
def create_pipeline_opts(args):
"""Create standard Pipeline Options for Google Cloud Dataflow"""
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = args.project
google_cloud_options.job_name = args.job_name
google_cloud_options.temp_location = '{}/temp'.format(args.storage_bucket)
google_cloud_options.staging_location = '{}/staging'.format(args.storage_bucket)
options.view_as(WorkerOptions).num_workers = args.num_workers
options.view_as(WorkerOptions).max_num_workers = args.max_num_workers
options.view_as(WorkerOptions).machine_type = args.machine_type
# Point to `setup.py` to allow Dataflow runner to install the package
options.view_as(SetupOptions).setup_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'setup.py')
return options
class SplitRepoPath(beam.DoFn):
# pylint: disable=abstract-method
"""Split the space-delimited file `repo_path` into owner repository (`nwo`)
and file path (`path`)"""
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
nwo, path = element.pop('repo_path').split(' ', 1)
element['nwo'] = nwo
element['path'] = path
yield element
class TokenizeCodeDocstring(beam.DoFn):
# pylint: disable=abstract-method
"""Compute code/docstring pairs from incoming BigQuery row dict"""
def __init__(self):
super(TokenizeCodeDocstring, self).__init__()
self.tokenization_time_ms = Metrics.counter(self.__class__, 'tokenization_time_ms')
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
try:
from preprocess.tokenizer import get_function_docstring_pairs
start_time = time.time()
element['pairs'] = get_function_docstring_pairs(element.pop('content'))
self.tokenization_time_ms.inc(int((time.time() - start_time) * 1000.0))
yield element
except Exception as e: #pylint: disable=broad-except
logging.warning('Tokenization failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
class ExtractFuncInfo(beam.DoFn):
# pylint: disable=abstract-method
"""Convert pair tuples from `TokenizeCodeDocstring` into dict containing query-friendly keys"""
def __init__(self, info_keys):
super(ExtractFuncInfo, self).__init__()
self.info_keys = info_keys
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument
try:
info_rows = [dict(zip(self.info_keys, pair)) for pair in element.pop('pairs')]
info_rows = [self.merge_two_dicts(info_dict, element) for info_dict in info_rows]
info_rows = map(self.dict_to_unicode, info_rows)
yield info_rows
except Exception as e: #pylint: disable=broad-except
logging.warning('Function Info extraction failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
@staticmethod
def merge_two_dicts(dict_a, dict_b):
result = dict_a.copy()
result.update(dict_b)
return result
@staticmethod
def dict_to_unicode(data_dict):
for k, v in data_dict.items():
if isinstance(v, str):
data_dict[k] = v.decode('utf-8', 'ignore')
return data_dict
class ProcessGithubFiles(beam.PTransform): class ProcessGithubFiles(beam.PTransform):
@ -106,6 +13,10 @@ class ProcessGithubFiles(beam.PTransform):
and writes back the processed code-docstring pairs in a query-friendly format back to BigQuery and writes back the processed code-docstring pairs in a query-friendly format back to BigQuery
table. table.
""" """
data_columns = ['nwo', 'path', 'function_name', 'lineno', 'original_function',
'function_tokens', 'docstring_tokens']
data_types = ['STRING', 'STRING', 'STRING', 'INTEGER', 'STRING', 'STRING', 'STRING']
def __init__(self, project, query_string, output_string, storage_bucket): def __init__(self, project, query_string, output_string, storage_bucket):
super(ProcessGithubFiles, self).__init__() super(ProcessGithubFiles, self).__init__()
@ -114,18 +25,14 @@ class ProcessGithubFiles(beam.PTransform):
self.output_dataset, self.output_table = output_string.split(':') self.output_dataset, self.output_table = output_string.split(':')
self.storage_bucket = storage_bucket self.storage_bucket = storage_bucket
self.data_columns = ['nwo', 'path', 'function_name', 'lineno', 'original_function',
'function_tokens', 'docstring_tokens']
self.data_types = ['STRING', 'STRING', 'STRING', 'INTEGER', 'STRING', 'STRING', 'STRING']
self.num_shards = 10 self.num_shards = 10
def expand(self, input_or_inputs): def expand(self, input_or_inputs):
tokenize_result = (input_or_inputs tokenize_result = (input_or_inputs
| "Read Github Dataset" >> beam.io.Read(beam.io.BigQuerySource(query=self.query_string, | "Read Github Dataset" >> beam.io.Read(beam.io.BigQuerySource(query=self.query_string,
use_standard_sql=True)) use_standard_sql=True))
| "Split 'repo_path'" >> beam.ParDo(SplitRepoPath()) | "Split 'repo_path'" >> beam.ParDo(do_fns.SplitRepoPath())
| "Tokenize Code/Docstring Pairs" >> beam.ParDo(TokenizeCodeDocstring()) | "Tokenize Code/Docstring Pairs" >> beam.ParDo(do_fns.TokenizeCodeDocstring())
.with_outputs('err_rows', main='rows') .with_outputs('err_rows', main='rows')
) )
@ -140,7 +47,7 @@ class ProcessGithubFiles(beam.PTransform):
info_result = (tokenize_result.rows info_result = (tokenize_result.rows
| "Extract Function Info" >> beam.ParDo(ExtractFuncInfo(self.data_columns[2:])) | "Extract Function Info" >> beam.ParDo(do_fns.ExtractFuncInfo(self.data_columns[2:]))
.with_outputs('err_rows', main='rows') .with_outputs('err_rows', main='rows')
) )
@ -172,6 +79,16 @@ class ProcessGithubFiles(beam.PTransform):
schema=self.create_output_schema()) schema=self.create_output_schema())
) )
@staticmethod
def get_key_list():
filter_keys = [
'original_function',
'lineno',
]
key_list = [col for col in ProcessGithubFiles.data_columns
if col not in filter_keys]
return key_list
def format_for_write(self, row): def format_for_write(self, row):
"""This method filters keys that we don't need in the """This method filters keys that we don't need in the
final CSV. It must ensure that there are no multi-line final CSV. It must ensure that there are no multi-line
@ -180,11 +97,7 @@ class ProcessGithubFiles(beam.PTransform):
derived Dataflow steps. This uses the CSV Writer derived Dataflow steps. This uses the CSV Writer
to handle all edge cases like quote escaping.""" to handle all edge cases like quote escaping."""
filter_keys = [ target_keys = self.get_key_list()
'original_function',
'lineno',
]
target_keys = [col for col in self.data_columns if col not in filter_keys]
target_values = [row[key].encode('utf-8') for key in target_keys] target_values = [row[key].encode('utf-8') for key in target_keys]
with io.BytesIO() as fs: with io.BytesIO() as fs:
@ -195,10 +108,10 @@ class ProcessGithubFiles(beam.PTransform):
return result_str return result_str
def create_output_schema(self): def create_output_schema(self):
table_schema = bigquery.TableSchema() table_schema = clients.bigquery.TableSchema()
for column, data_type in zip(self.data_columns, self.data_types): for column, data_type in zip(self.data_columns, self.data_types):
field_schema = bigquery.TableFieldSchema() field_schema = clients.bigquery.TableFieldSchema()
field_schema.name = column field_schema.name = column
field_schema.type = data_type field_schema.type = data_type
field_schema.mode = 'nullable' field_schema.mode = 'nullable'
@ -207,11 +120,11 @@ class ProcessGithubFiles(beam.PTransform):
return table_schema return table_schema
def create_failed_output_schema(self): def create_failed_output_schema(self):
table_schema = bigquery.TableSchema() table_schema = clients.bigquery.TableSchema()
for column, data_type in zip(self.data_columns[:2] + ['content'], for column, data_type in zip(self.data_columns[:2] + ['content'],
self.data_types[:2] + ['STRING']): self.data_types[:2] + ['STRING']):
field_schema = bigquery.TableFieldSchema() field_schema = clients.bigquery.TableFieldSchema()
field_schema.name = column field_schema.name = column
field_schema.type = data_type field_schema.type = data_type
field_schema.mode = 'nullable' field_schema.mode = 'nullable'

View File

@ -1,6 +1,6 @@
import ast import ast
from nltk.tokenize import RegexpTokenizer
import astor import astor
import nltk.tokenize as tokenize
import spacy import spacy
@ -13,7 +13,7 @@ def tokenize_docstring(text):
def tokenize_code(text): def tokenize_code(text):
"""A very basic procedure for tokenizing code strings.""" """A very basic procedure for tokenizing code strings."""
return RegexpTokenizer(r'\w+').tokenize(text) return tokenize.RegexpTokenizer(r'\w+').tokenize(text)
def get_function_docstring_pairs(blob): def get_function_docstring_pairs(blob):

View File

@ -1,8 +1,12 @@
tensor2tensor~=1.6.0 astor~=0.6.0
tensorflow~=1.8.0 apache-beam[gcp]~=2.5.0
oauth2client~=4.1.0
Flask~=1.0.0 Flask~=1.0.0
google-cloud-storage~=1.10.0
nltk~=3.3.0
nmslib~=1.7.0 nmslib~=1.7.0
numpy~=1.14.0 numpy~=1.14.0
google-cloud-storage~=1.10.0 oauth2client~=4.1.0
requests~=2.18.0 requests~=2.18.0
spacy~=2.0.0
tensor2tensor~=1.6.0
tensorflow~=1.8.0

View File

@ -3,13 +3,18 @@ import subprocess
from distutils.command.build import build as distutils_build #pylint: disable=no-name-in-module from distutils.command.build import build as distutils_build #pylint: disable=no-name-in-module
from setuptools import setup, find_packages, Command as SetupToolsCommand from setuptools import setup, find_packages, Command as SetupToolsCommand
VERSION = '0.1.dev0'
with open('requirements.txt', 'r') as f: with open('requirements.txt', 'r') as f:
install_requires = f.readlines() install_requires = f.readlines()
VERSION = '0.1.0'
CUSTOM_COMMANDS = [ CUSTOM_COMMANDS = [
['python', '-m', 'spacy', 'download', 'en'] ['python', '-m', 'spacy', 'download', 'en'],
##
# TODO(sanyamkapoor): This isn't ideal but no other way for a seamless install right now.
# This currently uses a fork due to API limitations (See kubeflow/batch-predict#10). The
# API limitations have a workaround via kubeflow/batch-predict#9.
['pip', 'install', 'https://github.com/activatedgeek/batch-predict/tarball/fix-value-provider']
] ]
@ -39,10 +44,10 @@ class CustomCommands(SetupToolsCommand):
self.run_custom_command(command) self.run_custom_command(command)
setup(name='kubeflow-code-search', setup(name='code-search',
description='Kubeflow Code Search Demo Preprocessing', description='Kubeflow Code Search Demo',
url='https://www.github.com/kubeflow/examples', url='https://www.github.com/kubeflow/examples',
author='Sanyam Kapoor', author='Google',
author_email='sanyamkapoor@google.com', author_email='sanyamkapoor@google.com',
version=VERSION, version=VERSION,
license='MIT', license='MIT',
@ -52,4 +57,12 @@ setup(name='kubeflow-code-search',
cmdclass={ cmdclass={
'build': Build, 'build': Build,
'CustomCommands': CustomCommands, 'CustomCommands': CustomCommands,
},
entry_points={
'console_scripts': [
'code-search-preprocess=code_search.cli:create_github_pipeline',
'code-search-predict=code_search.cli:create_batch_predict_pipeline',
'nmslib-serve=code_search.nmslib.cli:server',
'nmslib-create=code_search.nmslib.cli:creator',
]
}) })

View File

@ -1,11 +0,0 @@
FROM node:10.6
EXPOSE 5000
RUN npm i -g serve
ADD ./build /webapp
ENTRYPOINT ["serve"]
CMD ["-l", "5000", "-n", "/webapp"]