Add custom metrics, write raw tokens to GCS (#141)

* Add custom metrics, write raw tokens to GCS

* Change number of output file shards to 1
This commit is contained in:
Sanyam Kapoor 2018-06-13 12:03:27 -07:00 committed by k8s-ci-robot
parent ce2f1db11e
commit 242c2e6d20
2 changed files with 43 additions and 15 deletions

View File

@ -1,7 +1,10 @@
import os
import logging
import time
import apache_beam as beam
import apache_beam.io as io
from apache_beam import pvalue
from apache_beam.metrics import Metrics
from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions, \
GoogleCloudOptions, SetupOptions, WorkerOptions
from apache_beam.io.gcp.internal.clients import bigquery
@ -15,8 +18,8 @@ def create_pipeline_opts(args):
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = args.project
google_cloud_options.job_name = args.job_name
google_cloud_options.temp_location = '{}/{}/temp'.format(args.storage_bucket, args.job_name)
google_cloud_options.staging_location = '{}/{}/staging'.format(args.storage_bucket, args.job_name)
google_cloud_options.temp_location = '{}/temp'.format(args.storage_bucket)
google_cloud_options.staging_location = '{}/staging'.format(args.storage_bucket)
options.view_as(WorkerOptions).num_workers = args.num_workers
options.view_as(WorkerOptions).max_num_workers = args.max_num_workers
@ -44,15 +47,25 @@ class SplitRepoPath(beam.DoFn):
class TokenizeCodeDocstring(beam.DoFn):
# pylint: disable=abstract-method
"""Compute code/docstring pairs from incoming BigQuery row dict"""
def __init__(self):
super(TokenizeCodeDocstring, self).__init__()
self.tokenization_time_ms = Metrics.counter(self.__class__, 'tokenization_time_ms')
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
try:
from preprocess.tokenizer import get_function_docstring_pairs
start_time = time.time()
element['pairs'] = get_function_docstring_pairs(element.pop('content'))
self.tokenization_time_ms.inc(int((time.time() - start_time) * 1000.0))
yield element
except: #pylint: disable=bare-except
except Exception as e: #pylint: disable=broad-except
logging.warning('Tokenization failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
class ExtractFuncInfo(beam.DoFn):
# pylint: disable=abstract-method
"""Convert pair tuples from `TokenizeCodeDocstring` into dict containing query-friendly keys"""
@ -67,7 +80,8 @@ class ExtractFuncInfo(beam.DoFn):
info_rows = [self.merge_two_dicts(info_dict, element) for info_dict in info_rows]
info_rows = map(self.dict_to_unicode, info_rows)
yield info_rows
except: #pylint: disable=bare-except
except Exception as e: #pylint: disable=broad-except
logging.warning('Function Info extraction failed, %s', e.message)
yield pvalue.TaggedOutput('err_rows', element)
@staticmethod
@ -85,22 +99,25 @@ class ExtractFuncInfo(beam.DoFn):
class ProcessGithubFiles(beam.PTransform):
# pylint: disable=too-many-instance-attributes
"""A collection of `DoFn`s for Pipeline Transform. Reads the Github dataset from BigQuery
and writes back the processed code-docstring pairs in a query-friendly format back to BigQuery
table.
"""
def __init__(self, project, query_string, output_string):
def __init__(self, project, query_string, output_string, storage_bucket):
super(ProcessGithubFiles, self).__init__()
self.project = project
self.query_string = query_string
self.output_dataset, self.output_table = output_string.split(':')
self.storage_bucket = storage_bucket
self.data_columns = ['nwo', 'path', 'function_name', 'lineno', 'original_function',
'function_tokens', 'docstring_tokens']
self.data_types = ['STRING', 'STRING', 'STRING', 'INTEGER', 'STRING', 'STRING', 'STRING']
self.batch_size = 1000
self.num_shards = 1
def expand(self, input_or_inputs):
tokenize_result = (input_or_inputs
@ -116,8 +133,7 @@ class ProcessGithubFiles(beam.PTransform):
| "Failed Row Tokenization" >> io.WriteToBigQuery(project=self.project,
dataset=self.output_dataset,
table=self.output_table + '_failed',
schema=self.create_failed_output_schema(),
batch_size=self.batch_size)
schema=self.create_failed_output_schema())
)
# pylint: enable=expression-not-assigned
@ -132,18 +148,30 @@ class ProcessGithubFiles(beam.PTransform):
| "Failed Function Info" >> io.WriteToBigQuery(project=self.project,
dataset=self.output_dataset,
table=self.output_table + '_failed',
schema=self.create_failed_output_schema(),
batch_size=self.batch_size)
schema=self.create_failed_output_schema())
)
# pylint: enable=expression-not-assigned
return (info_result.rows
| "Flatten Rows" >> beam.FlatMap(lambda x: x)
processed_rows = (info_result.rows | "Flatten Rows" >> beam.FlatMap(lambda x: x))
# pylint: disable=expression-not-assigned
(processed_rows
| "Filter Function tokens" >> beam.Map(lambda x: x['function_tokens'])
| "Write Function tokens" >> io.WriteToText('{}/raw_data/data'.format(self.storage_bucket),
file_name_suffix='.function',
num_shards=self.num_shards))
(processed_rows
| "Filter Docstring tokens" >> beam.Map(lambda x: x['docstring_tokens'])
| "Write Docstring tokens" >> io.WriteToText('{}/raw_data/data'.format(self.storage_bucket),
file_name_suffix='.docstring',
num_shards=self.num_shards))
# pylint: enable=expression-not-assigned
return (processed_rows
| "Save Tokens" >> io.WriteToBigQuery(project=self.project,
dataset=self.output_dataset,
table=self.output_table,
schema=self.create_output_schema(),
batch_size=self.batch_size)
schema=self.create_output_schema())
)
def create_output_schema(self):

View File

@ -32,7 +32,7 @@ def main(args):
query_string = f.read()
pipeline = beam.Pipeline(options=pipeline_opts)
(pipeline | ProcessGithubFiles(args.project, query_string, args.output)) #pylint: disable=expression-not-assigned
(pipeline | ProcessGithubFiles(args.project, query_string, args.output, args.storage_bucket)) #pylint: disable=expression-not-assigned
pipeline.run()