Semantic Code Search Example Data Ingestion (#120)

* Code Search Preprocessing Pipeline

* Add missing pipeline execution to git tree

* Move the preprocessing step into its own package

* Add docstrings

* Fix pylint errors
This commit is contained in:
Sanyam Kapoor 2018-05-31 15:28:56 -07:00 committed by k8s-ci-robot
parent 174d6602ac
commit 26ff66d747
10 changed files with 458 additions and 0 deletions

108
code_search/preprocess/.gitignore vendored Normal file
View File

@ -0,0 +1,108 @@
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
# Usually these files are written by a python script from a template
# before PyInstaller builds the exe, so as to inject date/other infos into it.
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
.pytest_cache/
# Translations
*.mo
*.pot
# Django stuff:
*.log
local_settings.py
db.sqlite3
# Flask stuff:
instance/
.webassets-cache
# Scrapy stuff:
.scrapy
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# pyenv
.python-version
# celery beat schedule file
celerybeat-schedule
# SageMath parsed files
*.sage.py
# Environments
.env
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# Spyder project settings
.spyderproject
.spyproject
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
# Virtual Environments
venv/

View File

@ -0,0 +1,2 @@
include README.md requirements.txt
include files/*

View File

@ -0,0 +1,51 @@
# Semantic Code Search
Pre-processing Pipeline package for End-to-End Semantic Code Search on Kubeflow
## Prerequisites
* Python 2.7 (with `pip`)
* Python `virtualenv`
**NOTE**: This package uses Google Cloud Dataflow which only supports Python 2.7.
## Setup
* Setup Python Virtual Environment
```
$ virtualenv venv
$ source venv/bin/activate
```
* Install [`gcloud`](https://cloud.google.com/sdk/gcloud/) CLI
* Setup Application Default Credentials
```
$ gcloud auth application-default login
```
* Enable Dataflow via Command Line (or use the Google Cloud Console)
```
$ gcloud services enable dataflow.googleapis.com
```
* Build and install package
```
$ python setup.py build install
```
# Execution
Submit a `Dataflow` job using the following command
```
$ python scripts/process_github_archive.py -i files/select_github_archive.sql -o code_search:function_docstrings -p kubeflow-dev
-j process-github-archive --storage-bucket gs://kubeflow-dev
```
**NOTE**: Make sure the Project and Google Storage Bucket is created.
# Acknowledgements
This project derives from [hamelsmu/code_search](https://github.com/hamelsmu/code_search).

View File

@ -0,0 +1,42 @@
SELECT
MAX(CONCAT(f.repo_name, ' ', f.path)) AS repo_path,
c.content
FROM
`bigquery-public-data.github_repos.files` AS f
JOIN
`bigquery-public-data.github_repos.contents` AS c
ON
f.id = c.id
JOIN (
--this part of the query makes sure repo is watched at least twice since 2017
SELECT
repo
FROM (
SELECT
repo.name AS repo
FROM
`githubarchive.year.2017`
WHERE
type="WatchEvent"
UNION ALL
SELECT
repo.name AS repo
FROM
`githubarchive.month.2018*`
WHERE
type="WatchEvent" )
GROUP BY
1
HAVING
COUNT(*) >= 2 ) AS r
ON
f.repo_name = r.repo
WHERE
f.path LIKE '%.py' AND --with python extension
c.size < 15000 AND --get rid of ridiculously long files
REGEXP_CONTAINS(c.content, r'def ') --contains function definition
GROUP BY
c.content
-- for development purposes only
LIMIT
1000000

View File

@ -0,0 +1,117 @@
import os
import apache_beam as beam
import apache_beam.io as io
from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions, \
GoogleCloudOptions, SetupOptions
from apache_beam.io.gcp.internal.clients import bigquery
def create_pipeline_opts(args):
"""Create standard Pipeline Options for Google Cloud Dataflow"""
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = args.project
google_cloud_options.job_name = args.job_name
google_cloud_options.temp_location = '{}/{}/temp'.format(args.storage_bucket, args.job_name)
google_cloud_options.staging_location = '{}/{}/staging'.format(args.storage_bucket, args.job_name)
# Point to `setup.py` to allow Dataflow runner to install the package
options.view_as(SetupOptions).setup_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'setup.py')
return options
class SplitRepoPath(beam.DoFn):
# pylint: disable=abstract-method
"""Split the space-delimited file `repo_path` into owner repository (`nwo`)
and file path (`path`)"""
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
nwo, path = element.pop('repo_path').split(' ', 1)
element['nwo'] = nwo
element['path'] = path
yield element
class TokenizeCodeDocstring(beam.DoFn):
# pylint: disable=abstract-method
"""Compute code/docstring pairs from incoming BigQuery row dict"""
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use
from preprocess.tokenizer import get_function_docstring_pairs
element['pairs'] = get_function_docstring_pairs(element.pop('content'))
yield element
class ExtractFuncInfo(beam.DoFn):
# pylint: disable=abstract-method
"""Convert pair tuples from `TokenizeCodeDocstring` into dict containing query-friendly keys"""
def __init__(self, info_keys):
super(ExtractFuncInfo, self).__init__()
self.info_keys = info_keys
def process(self, element, *args, **kwargs): # pylint: disable=unused-argument
info_rows = [dict(zip(self.info_keys, pair)) for pair in element.pop('pairs')]
info_rows = [self.merge_two_dicts(info_dict, element) for info_dict in info_rows]
info_rows = map(self.dict_to_unicode, info_rows)
yield info_rows
@staticmethod
def merge_two_dicts(dict_a, dict_b):
result = dict_a.copy()
result.update(dict_b)
return result
@staticmethod
def dict_to_unicode(data_dict):
for k, v in data_dict.items():
if isinstance(v, str):
data_dict[k] = v.encode('utf-8', 'ignore')
return data_dict
class BigQueryGithubFiles(beam.PTransform):
"""A collection of `DoFn`s for Pipeline Transform. Reads the Github dataset from BigQuery
and writes back the processed code-docstring pairs in a query-friendly format back to BigQuery
table.
"""
def __init__(self, project, query_string, output_string):
super(BigQueryGithubFiles, self).__init__()
self.project = project
self.query_string = query_string
self.output_dataset, self.output_table = output_string.split(':')
self.data_columns = ['nwo', 'path', 'function_name', 'lineno', 'original_function',
'function_tokens', 'docstring_tokens']
self.data_types = ['STRING', 'STRING', 'STRING', 'INTEGER', 'STRING', 'STRING', 'STRING']
def expand(self, input_or_inputs):
return (input_or_inputs
| "Read BigQuery Rows" >> io.Read(io.BigQuerySource(query=self.query_string,
use_standard_sql=True))
| "Split 'repo_path'" >> beam.ParDo(SplitRepoPath())
| "Tokenize Code/Docstring Pairs" >> beam.ParDo(TokenizeCodeDocstring())
| "Extract Function Info" >> beam.ParDo(ExtractFuncInfo(self.data_columns[2:]))
| "Flatten Rows" >> beam.FlatMap(lambda x: x)
| "Write to BigQuery" >> io.WriteToBigQuery(project=self.project,
dataset=self.output_dataset,
table=self.output_table,
schema=self.create_output_schema())
)
def create_output_schema(self):
table_schema = bigquery.TableSchema()
for column, data_type in zip(self.data_columns, self.data_types):
field_schema = bigquery.TableFieldSchema()
field_schema.name = column
field_schema.type = data_type
field_schema.mode = 'nullable'
table_schema.fields.append(field_schema)
return table_schema

View File

@ -0,0 +1,38 @@
import ast
import astor
import spacy
from nltk.tokenize import RegexpTokenizer
def tokenize_docstring(text):
"""Apply tokenization using spacy to docstrings."""
en = spacy.load('en')
tokens = en.tokenizer(text.decode('utf8'))
return [token.text.lower() for token in tokens if not token.is_space]
def tokenize_code(text):
"""A very basic procedure for tokenizing code strings."""
return RegexpTokenizer(r'\w+').tokenize(text)
def get_function_docstring_pairs(blob):
"""Extract (function/method, docstring) pairs from a given code blob."""
pairs = []
try:
module = ast.parse(blob)
classes = [node for node in module.body if isinstance(node, ast.ClassDef)]
functions = [node for node in module.body if isinstance(node, ast.FunctionDef)]
for _class in classes:
functions.extend([node for node in _class.body if isinstance(node, ast.FunctionDef)])
for f in functions:
source = astor.to_source(f)
docstring = ast.get_docstring(f) if ast.get_docstring(f) else ''
func = source.replace(ast.get_docstring(f, clean=False), '') if docstring else source
pairs.append((f.name, f.lineno, source, ' '.join(tokenize_code(func)),
' '.join(tokenize_docstring(docstring.split('\n\n')[0]))))
except (AssertionError, MemoryError, SyntaxError, UnicodeEncodeError):
pass
return pairs

View File

@ -0,0 +1,4 @@
astor~=0.6.0
apache-beam[gcp]~=2.4.0
nltk~=3.3.0
spacy~=2.0.0

View File

@ -0,0 +1,36 @@
from __future__ import print_function
import argparse
import apache_beam as beam
from preprocess.pipeline import create_pipeline_opts, BigQueryGithubFiles
def parse_arguments(args):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i', '--input', metavar='', help='Path to BigQuery SQL script')
parser.add_argument('-o', '--output', metavar='',
help='Output string of the format <dataset>:<table>')
parser.add_argument('-p', '--project', metavar='', default='Project', help='Project ID')
parser.add_argument('-j', '--job-name', metavar='', default='Beam Job', help='Job name')
parser.add_argument('--storage-bucket', metavar='', default='gs://bucket',
help='Path to Google Storage Bucket')
parsed_args = parser.parse_args(args)
return parsed_args
def main(args):
args = parse_arguments(args)
pipeline_opts = create_pipeline_opts(args)
with open(args.input, 'r') as f:
query_string = f.read()
pipeline = beam.Pipeline(options=pipeline_opts)
(pipeline | BigQueryGithubFiles(args.project, query_string, args.output)) #pylint: disable=expression-not-assigned
pipeline.run()
if __name__ == '__main__':
import sys
main(sys.argv[1:])

View File

@ -0,0 +1,60 @@
from __future__ import print_function
import subprocess
from distutils.command.build import build as distutils_build #pylint: disable=no-name-in-module
from setuptools import setup, find_packages, Command as SetupToolsCommand
with open('requirements.txt', 'r') as f:
install_requires = f.readlines()
with open('README.md') as f:
README = f.read()
VERSION = '0.1.0'
CUSTOM_COMMANDS = [
['python', '-m', 'spacy', 'download', 'en']
]
class Build(distutils_build):
sub_commands = distutils_build.sub_commands + [('CustomCommands', None)]
class CustomCommands(SetupToolsCommand):
def initialize_options(self):
pass
def finalize_options(self):
pass
@staticmethod
def run_custom_command(command_list):
print('Running command: %s' % command_list)
p = subprocess.Popen(command_list, stdin=subprocess.PIPE,
stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
stdout_data, _ = p.communicate()
print('Command output: %s' % stdout_data)
if p.returncode != 0:
raise RuntimeError('Command %s failed: exit code: %s' % (command_list, p.returncode))
def run(self):
for command in CUSTOM_COMMANDS:
self.run_custom_command(command)
setup(name='kubeflow-code-search',
description='Kubeflow Code Search Demo',
long_description=README,
long_description_content_type='text/markdown',
url='https://www.github.com/kubeflow/examples',
author='Sanyam Kapoor',
author_email='sanyamkapoor@google.com',
version=VERSION,
license='MIT',
packages=find_packages(),
install_requires=install_requires,
extras_require={},
cmdclass={
'build': Build,
'CustomCommands': CustomCommands,
})