Dataflow job should support writing embeddings to a different location (Fix #366). (#388)

* Datflow job should support writing embeddings to a different location (Fix #366).

* Dataflow job to compute code embeddings needs to have parameters controlling
  the location of the outputs independent of the inputs. Prior to this fix the
  same table in the dataset was always written and the files were always created
  in the data dir.

* This made it very difficult to rerun the embeddings job on the latest GitHub
  data (e.g to regularly update the code embeddings) without overwritting
  the current embeddings.

* Refactor how we create BQ sinks and sources in this pipeline

  * Rather than create a wrapper class that bundles together a sink and schema
    we should have a separate helper class for creating BQ schemas and then
    use WriteToBigQuery directly.

  * Similarly for ReadTransforms we don't need a wrapper class that bundles
    a query and source. We can just create a class/constant to represent
    queries and pass them directly to the appropriate source.

* Change BQ write disposition to if empty so we don't overwrite existing data.

* Fix #390 worker setup fails because requirements.dataflow.txt not found

  * Dataflow always uses the local file requirements.txt regardless of the
    local file used as the source.

  * When job is submitted it will also try to build a sdist package on
    the client which invokes setup.py

  * So we in setup.py we always refer to requirements.txt

  * If trying to install the package in other contexts,
    requirements.dataflow.txt should be renamed to requirements.txt

  * We do this in the Dockerfile.

* Refactor the CreateFunctionEmbeddings code so that writing to BQ
  is not part of the compute function embeddings code;
  (will make it easier to test.)

* * Fix typo in jsonnet with output dir; missing an "=".
This commit is contained in:
Jeremy Lewi 2018-12-02 09:51:27 -08:00 committed by Kubernetes Prow Robot
parent e8cf9c58ce
commit 78fdc74b56
10 changed files with 129 additions and 59 deletions

View File

@ -22,6 +22,11 @@ RUN python -m spacy download en
ADD src/code_search /app/code_search
ADD src /src
# See: https://github.com/kubeflow/examples/issues/390
# Dataflow will try to build a source package locally and we need
# the path to match what we have in setup.py.
RUN ln -sf /src/requirements.dataflow.txt /src/requirements.txt
WORKDIR /src
ENV PYTHONIOENCODING=utf-8 T2T_USR_DIR=/app/code_search/t2t

View File

@ -13,6 +13,19 @@
problem: "kf_github_function_docstring",
model: "kf_similarity_transformer",
// The table containing the token pairs for (docstring, code)
bqDataset: "code_search",
//tokenPairsBQTable: self.project,
//functionEmbeddingsBQTable: "someothervalue",
tokenPairsBQTable: self.project + ":" + self.bqDataset + ".token_pairs",
jobNameSuffix: "20181201-1530",
bqSuffix: std.strReplace(self.jobNameSuffix, "-", "_"),
functionEmbeddingsBQTable: self.project + ":" + self.bqDataset + ".code_embeddings_" + self.bqSuffix,
// Location where the function embeddings should be written.
functionEmbeddingsDir: "gs://code-search-demo/20181130/code_embeddings",
// Location to write the index file for nmslib and the file to be used as the reverse lookup
// with the index server.
lookupFile: "gs://code-search-demo/20181104/code-embeddings-index/embedding-to-info.csv",

View File

@ -15,7 +15,7 @@
// are not picked up by the individual components.
// Need to see if we can find a way to fix this.
local imageTag = "v20181127-08f8c05-dirty-d9f034",
local imageTag = "v20181201-ae61193-dirty-d11191",
"t2t-job": {
jobType: "trainer",
@ -111,17 +111,16 @@
"submit-code-embeddings-job": {
name: "submit-code-embeddings-job",
image: $.components["t2t-job"].dataflowImage,
// Input table this should be of the form PROJECT:DATASET.table
inputTable: "",
// Big query table where results will be written.
targetDataset: "code_search",
workingDir: $.components["t2t-code-search"].workingDir,
dataDir: self.workingDir + "/data",
targetDataset: "code_search",
// Directory where the model is stored.
modelDir: "",
jobName: "submit-code-embeddings-job",
jobNameSuffix: "",
workerMachineType: "n1-highcpu-32",
numWorkers: 5,
project: "",
waitUntilFinish: "false",
},

View File

@ -30,9 +30,11 @@
"python2",
"-m",
"code_search.dataflow.cli.create_function_embeddings",
"--runner=DataflowRunner",
"--runner=DataflowRunner",
"--project=" + params.project,
"--target_dataset=" + params.targetDataset,
"--token_pairs_table=" + params.tokenPairsBQTable,
"--function_embeddings_table=" + params.functionEmbeddingsBQTable,
"--output_dir=" + params.functionEmbeddingsDir,
"--data_dir=" + params.dataDir,
"--problem=" + params.problem,
"--job_name=" + params.jobName + '-' + params.jobNameSuffix,
@ -41,7 +43,7 @@
"--staging_location=" + params.workingDir + "/dataflow/staging",
"--worker_machine_type=" + params.workerMachineType,
"--num_workers=" + params.numWorkers,
"--requirements_file=requirements.dataflow.txt",
"--requirements_file=requirements.txt",
if (params.waitUntilFinish == "true") then
"--wait_until_finished"
else [],

View File

@ -44,13 +44,23 @@ def add_parser_arguments(parser):
'specified we run a query to filter the data.'))
predict_args_parser = parser.add_argument_group('Batch Prediction Arguments')
predict_args_parser.add_argument('--token_pairs_table', metavar='', type=str,
help='The BigQuery table containing the '
'token pairs. This should be '
'of the form PROJECT:DATASET.TABLE.')
predict_args_parser.add_argument('--function_embeddings_table', metavar='', type=str,
help='The BigQuery table to write the '
'function embeddings too. This should be '
'of the form PROJECT:DATASET.TABLE.')
predict_args_parser.add_argument('--problem', metavar='', type=str,
help='Name of the T2T problem')
predict_args_parser.add_argument('--data_dir', metavar='', type=str,
help='Path to directory of the T2T problem data')
predict_args_parser.add_argument('--saved_model_dir', metavar='', type=str,
help='Path to directory containing Tensorflow SavedModel')
predict_args_parser.add_argument('--output_dir', metavar='', type=str,
help='Path to directory where the output '
'should should be written.')
def prepare_pipeline_opts(argv=None):
"""Prepare pipeline options from CLI arguments.

View File

@ -4,6 +4,7 @@ import logging
import apache_beam as beam
import code_search.dataflow.cli.arguments as arguments
from code_search.dataflow.transforms import bigquery
import code_search.dataflow.transforms.github_bigquery as gh_bq
import code_search.dataflow.transforms.function_embeddings as func_embed
import code_search.dataflow.do_fns.dict_to_csv as dict_to_csv
@ -16,7 +17,7 @@ def create_function_embeddings(argv=None):
- Read the Processed Github Dataset from BigQuery
- Encode the functions using T2T problem
- Get function embeddings using `kubeflow_batch_predict.dataflow.batch_prediction`
- All results are stored in a BigQuery dataset (`args.target_dataset`)
- All results are stored in a BigQuery dataset (`args.function_embeddings_table`)
- See `transforms.github_dataset.GithubBatchPredict` for details of tables created
- Additionally, store CSV of docstring, original functions and other metadata for
reverse index lookup during search engine queries.
@ -29,26 +30,45 @@ def create_function_embeddings(argv=None):
pipeline = beam.Pipeline(options=pipeline_opts)
token_pairs = (pipeline
| "Read Transformed Github Dataset" >> gh_bq.ReadTransformedGithubDataset(
args.project, dataset=args.target_dataset)
| "Compute Function Embeddings" >> func_embed.FunctionEmbeddings(args.project,
args.target_dataset,
args.problem,
token_pairs_query = gh_bq.ReadTransformedGithubDatasetQuery(
args.token_pairs_table)
token_pairs_source = beam.io.BigQuerySource(
query=token_pairs_query.query_string, use_standard_sql=True)
embeddings = (pipeline
| "Read Transformed Github Dataset" >> beam.io.Read(token_pairs_source)
| "Compute Function Embeddings" >> func_embed.FunctionEmbeddings(args.problem,
args.data_dir,
args.saved_model_dir)
)
(token_pairs # pylint: disable=expression-not-assigned
function_embeddings_schema = bigquery.BigQuerySchema([
('nwo', 'STRING'),
('path', 'STRING'),
('function_name', 'STRING'),
('lineno', 'STRING'),
('original_function', 'STRING'),
('function_embedding', 'STRING')
])
(embeddings # pylint: disable=expression-not-assigned
| "Save Function Embeddings" >>
beam.io.WriteToBigQuery(table=args.function_embeddings_table,
schema=function_embeddings_schema.table_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY)
)
(embeddings # pylint: disable=expression-not-assigned
| "Format for CSV Write" >> beam.ParDo(dict_to_csv.DictToCSVString(
['nwo', 'path', 'function_name', 'lineno', 'original_function', 'function_embedding']))
| "Write Embeddings to CSV" >> beam.io.WriteToText('{}/func-index'.format(args.data_dir),
| "Write Embeddings to CSV" >> beam.io.WriteToText('{}/func-index'.format(args.output_dir),
file_name_suffix='.csv',
num_shards=100)
)
result = pipeline.run()
logging.info("Submitted Dataflow job: %s", result)
# TODO(jlewi): Doesn't dataflow define a default option.
if args.wait_until_finished:
result.wait_until_finish()

View File

@ -3,6 +3,9 @@ import apache_beam.io.gcp.bigquery as bigquery
import apache_beam.io.gcp.internal.clients as clients
# TODO(jlewi): Is this class necessary? Seems like a layer of indirection
# Around BigQuerySource. I think a better pattern might just be to create
# constants / classes defining the queries.
class BigQueryRead(beam.PTransform):
"""Wrapper over Apache Beam Big Query Read.
@ -39,6 +42,30 @@ class BigQueryRead(beam.PTransform):
)
class BigQuerySchema(object):
"""Class for representing BigQuery schemas."""
def __init__(self, columns):
"""Construct the schema.
Args: list of tuples defining the BigQuerySchema [
('column_name', 'column_type')
]
"""
self.columns = columns
self.table_schema = clients.bigquery.TableSchema()
for column_name, column_type in self.columns:
field_schema = clients.bigquery.TableFieldSchema()
field_schema.name = column_name
field_schema.type = column_type
field_schema.mode = 'nullable'
self.table_schema.fields.append(field_schema)
# TODO(https://github.com/kubeflow/examples/issues/381):
# We should probably refactor this into a separate
# class for helping with schemas and not bundle that with the BigQuerySink.
class BigQueryWrite(beam.PTransform):
"""Wrapper over Apache Beam BigQuery Write.

View File

@ -2,7 +2,6 @@ import apache_beam as beam
import code_search.dataflow.do_fns.prediction_do_fn as pred
import code_search.dataflow.do_fns.function_embeddings as func_embeddings # pylint: disable=no-name-in-module
import code_search.dataflow.transforms.github_bigquery as github_bigquery
class FunctionEmbeddings(beam.PTransform):
@ -12,13 +11,16 @@ class FunctionEmbeddings(beam.PTransform):
prepares each element's function tokens for prediction
by encoding it into base64 format and returns an updated
dictionary element with the embedding for further processing.
Args:
project: The project
target_table: The table to write to. Should be of the form
"project:dataset.table"
"""
def __init__(self, project, target_dataset, problem, data_dir, saved_model_dir):
def __init__(self, problem, data_dir, saved_model_dir):
super(FunctionEmbeddings, self).__init__()
self.project = project
self.target_dataset = target_dataset
self.problem = problem
self.data_dir = data_dir
self.saved_model_dir = saved_model_dir
@ -38,9 +40,4 @@ class FunctionEmbeddings(beam.PTransform):
| "Process Function Embeddings" >> beam.ParDo(func_embeddings.ProcessFunctionEmbedding())
)
(formatted_predictions # pylint: disable=expression-not-assigned
| "Save Function Embeddings" >> github_bigquery.WriteGithubFunctionEmbeddings(
self.project, self.target_dataset)
)
return formatted_predictions

View File

@ -1,4 +1,3 @@
import apache_beam.io.gcp.bigquery as bigquery
import code_search.dataflow.transforms.bigquery as bq_transform
@ -99,45 +98,31 @@ class WriteTokenizedData(bq_transform.BigQueryWrite):
]
class ReadTransformedGithubDataset(bq_transform.BigQueryRead):
class ReadTransformedGithubDatasetQuery(object):
def __init__(self, project, dataset=None, table=PAIRS_TABLE):
super(ReadTransformedGithubDataset, self).__init__(project, dataset=dataset, table=table)
def __init__(self, table, limit=None):
"""Query to select the transformed (token pairs) from BigQuery.
@property
def limit(self):
# return 500
return None
Args:
table: The table to query.
limit: Limit if needed.
"""
self.table = table
self.limit = limit
@property
def query_string(self):
# https://cloud.google.com/bigquery/docs/reference/standard-sql/enabling-standard-sql
table = self.table
# In SQL queries table format uses period not :.
table = table.replace(":", ".")
query = """
SELECT
SELECT
nwo, path, function_name, lineno, original_function, function_tokens, docstring_tokens
FROM
{}.{}
""".format(self.dataset, self.table)
`{0}`
""".format(table)
if self.limit:
query += '\nLIMIT {}'.format(self.limit)
return query
class WriteGithubFunctionEmbeddings(bq_transform.BigQueryWrite):
def __init__(self, project, dataset, table=FUNCTION_EMBEDDINGS_TABLE, batch_size=500,
write_disposition=bigquery.BigQueryDisposition.WRITE_TRUNCATE):
super(WriteGithubFunctionEmbeddings, self).__init__(project, dataset, table,
batch_size=batch_size,
write_disposition=write_disposition)
@property
def column_list(self):
return [
('nwo', 'STRING'),
('path', 'STRING'),
('function_name', 'STRING'),
('lineno', 'STRING'),
('original_function', 'STRING'),
('function_embedding', 'STRING')
]

View File

@ -1,11 +1,23 @@
from __future__ import print_function
import os
import subprocess
from distutils.command.build import build as distutils_build #pylint: disable=no-name-in-module
from setuptools import setup, find_packages, Command as SetupToolsCommand
VERSION = '0.1.dev0'
with open('requirements.dataflow.txt', 'r') as f:
# Dataflow workers always download the requirements file to
# requirements.txt regardless of what file was used as the source.
# So we follow that convention. If running on some other platform
# you may need to rename the file
if not os.path.exists('requirements.txt'):
if os.path.exists('requirements.dataflow.txt'):
print('Error: requirements.txt does not exist but '
'requirements.dataflow.txt does.')
print('You probably need to rename requirements.txt to '
'requirements.dataflow.txt.')
with open('requirements.txt', 'r') as f:
install_requires = f.readlines()
CUSTOM_COMMANDS = [