diff --git a/code_search/preprocess/.gitignore b/code_search/preprocess/.gitignore new file mode 100644 index 00000000..d8a9972e --- /dev/null +++ b/code_search/preprocess/.gitignore @@ -0,0 +1,108 @@ +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + +# C extensions +*.so + +# Distribution / packaging +.Python +build/ +develop-eggs/ +dist/ +downloads/ +eggs/ +.eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +wheels/ +*.egg-info/ +.installed.cfg +*.egg +MANIFEST + +# PyInstaller +# Usually these files are written by a python script from a template +# before PyInstaller builds the exe, so as to inject date/other infos into it. +*.manifest +*.spec + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +htmlcov/ +.tox/ +.coverage +.coverage.* +.cache +nosetests.xml +coverage.xml +*.cover +.hypothesis/ +.pytest_cache/ + +# Translations +*.mo +*.pot + +# Django stuff: +*.log +local_settings.py +db.sqlite3 + +# Flask stuff: +instance/ +.webassets-cache + +# Scrapy stuff: +.scrapy + +# Sphinx documentation +docs/_build/ + +# PyBuilder +target/ + +# Jupyter Notebook +.ipynb_checkpoints + +# pyenv +.python-version + +# celery beat schedule file +celerybeat-schedule + +# SageMath parsed files +*.sage.py + +# Environments +.env +.venv +env/ +venv/ +ENV/ +env.bak/ +venv.bak/ + +# Spyder project settings +.spyderproject +.spyproject + +# Rope project settings +.ropeproject + +# mkdocs documentation +/site + +# mypy +.mypy_cache/ + +# Virtual Environments +venv/ + diff --git a/code_search/preprocess/MANIFEST.in b/code_search/preprocess/MANIFEST.in new file mode 100644 index 00000000..9ceceb4e --- /dev/null +++ b/code_search/preprocess/MANIFEST.in @@ -0,0 +1,2 @@ +include README.md requirements.txt +include files/* diff --git a/code_search/preprocess/README.md b/code_search/preprocess/README.md new file mode 100644 index 00000000..5106cdd0 --- /dev/null +++ b/code_search/preprocess/README.md @@ -0,0 +1,51 @@ +# Semantic Code Search + +Pre-processing Pipeline package for End-to-End Semantic Code Search on Kubeflow + +## Prerequisites + +* Python 2.7 (with `pip`) +* Python `virtualenv` + +**NOTE**: This package uses Google Cloud Dataflow which only supports Python 2.7. + +## Setup + +* Setup Python Virtual Environment +``` +$ virtualenv venv +$ source venv/bin/activate +``` + +* Install [`gcloud`](https://cloud.google.com/sdk/gcloud/) CLI + +* Setup Application Default Credentials +``` +$ gcloud auth application-default login +``` + +* Enable Dataflow via Command Line (or use the Google Cloud Console) +``` +$ gcloud services enable dataflow.googleapis.com +``` + +* Build and install package +``` +$ python setup.py build install +``` + + +# Execution + +Submit a `Dataflow` job using the following command + +``` +$ python scripts/process_github_archive.py -i files/select_github_archive.sql -o code_search:function_docstrings -p kubeflow-dev + -j process-github-archive --storage-bucket gs://kubeflow-dev +``` + +**NOTE**: Make sure the Project and Google Storage Bucket is created. + +# Acknowledgements + +This project derives from [hamelsmu/code_search](https://github.com/hamelsmu/code_search). diff --git a/code_search/preprocess/files/select_github_archive.sql b/code_search/preprocess/files/select_github_archive.sql new file mode 100644 index 00000000..1bd3599e --- /dev/null +++ b/code_search/preprocess/files/select_github_archive.sql @@ -0,0 +1,42 @@ +SELECT + MAX(CONCAT(f.repo_name, ' ', f.path)) AS repo_path, + c.content +FROM + `bigquery-public-data.github_repos.files` AS f +JOIN + `bigquery-public-data.github_repos.contents` AS c +ON + f.id = c.id +JOIN ( + --this part of the query makes sure repo is watched at least twice since 2017 + SELECT + repo + FROM ( + SELECT + repo.name AS repo + FROM + `githubarchive.year.2017` + WHERE + type="WatchEvent" + UNION ALL + SELECT + repo.name AS repo + FROM + `githubarchive.month.2018*` + WHERE + type="WatchEvent" ) + GROUP BY + 1 + HAVING + COUNT(*) >= 2 ) AS r +ON + f.repo_name = r.repo +WHERE + f.path LIKE '%.py' AND --with python extension + c.size < 15000 AND --get rid of ridiculously long files + REGEXP_CONTAINS(c.content, r'def ') --contains function definition +GROUP BY + c.content +-- for development purposes only +LIMIT + 1000000 diff --git a/code_search/preprocess/preprocess/__init__.py b/code_search/preprocess/preprocess/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/code_search/preprocess/preprocess/pipeline.py b/code_search/preprocess/preprocess/pipeline.py new file mode 100644 index 00000000..360e6b8b --- /dev/null +++ b/code_search/preprocess/preprocess/pipeline.py @@ -0,0 +1,117 @@ +import os +import apache_beam as beam +import apache_beam.io as io +from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions, \ + GoogleCloudOptions, SetupOptions +from apache_beam.io.gcp.internal.clients import bigquery + + +def create_pipeline_opts(args): + """Create standard Pipeline Options for Google Cloud Dataflow""" + options = PipelineOptions() + options.view_as(StandardOptions).runner = 'DataflowRunner' + + google_cloud_options = options.view_as(GoogleCloudOptions) + google_cloud_options.project = args.project + google_cloud_options.job_name = args.job_name + google_cloud_options.temp_location = '{}/{}/temp'.format(args.storage_bucket, args.job_name) + google_cloud_options.staging_location = '{}/{}/staging'.format(args.storage_bucket, args.job_name) + + # Point to `setup.py` to allow Dataflow runner to install the package + options.view_as(SetupOptions).setup_file = os.path.join( + os.path.dirname(os.path.dirname(__file__)), 'setup.py') + + return options + + +class SplitRepoPath(beam.DoFn): + # pylint: disable=abstract-method + """Split the space-delimited file `repo_path` into owner repository (`nwo`) + and file path (`path`)""" + + def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + nwo, path = element.pop('repo_path').split(' ', 1) + element['nwo'] = nwo + element['path'] = path + yield element + + +class TokenizeCodeDocstring(beam.DoFn): + # pylint: disable=abstract-method + """Compute code/docstring pairs from incoming BigQuery row dict""" + + def process(self, element, *args, **kwargs): # pylint: disable=unused-argument,no-self-use + from preprocess.tokenizer import get_function_docstring_pairs + element['pairs'] = get_function_docstring_pairs(element.pop('content')) + yield element + + +class ExtractFuncInfo(beam.DoFn): + # pylint: disable=abstract-method + """Convert pair tuples from `TokenizeCodeDocstring` into dict containing query-friendly keys""" + def __init__(self, info_keys): + super(ExtractFuncInfo, self).__init__() + + self.info_keys = info_keys + + def process(self, element, *args, **kwargs): # pylint: disable=unused-argument + info_rows = [dict(zip(self.info_keys, pair)) for pair in element.pop('pairs')] + info_rows = [self.merge_two_dicts(info_dict, element) for info_dict in info_rows] + info_rows = map(self.dict_to_unicode, info_rows) + yield info_rows + + @staticmethod + def merge_two_dicts(dict_a, dict_b): + result = dict_a.copy() + result.update(dict_b) + return result + + @staticmethod + def dict_to_unicode(data_dict): + for k, v in data_dict.items(): + if isinstance(v, str): + data_dict[k] = v.encode('utf-8', 'ignore') + return data_dict + + +class BigQueryGithubFiles(beam.PTransform): + """A collection of `DoFn`s for Pipeline Transform. Reads the Github dataset from BigQuery + and writes back the processed code-docstring pairs in a query-friendly format back to BigQuery + table. + """ + def __init__(self, project, query_string, output_string): + super(BigQueryGithubFiles, self).__init__() + + self.project = project + self.query_string = query_string + self.output_dataset, self.output_table = output_string.split(':') + + self.data_columns = ['nwo', 'path', 'function_name', 'lineno', 'original_function', + 'function_tokens', 'docstring_tokens'] + self.data_types = ['STRING', 'STRING', 'STRING', 'INTEGER', 'STRING', 'STRING', 'STRING'] + + def expand(self, input_or_inputs): + return (input_or_inputs + | "Read BigQuery Rows" >> io.Read(io.BigQuerySource(query=self.query_string, + use_standard_sql=True)) + | "Split 'repo_path'" >> beam.ParDo(SplitRepoPath()) + | "Tokenize Code/Docstring Pairs" >> beam.ParDo(TokenizeCodeDocstring()) + | "Extract Function Info" >> beam.ParDo(ExtractFuncInfo(self.data_columns[2:])) + | "Flatten Rows" >> beam.FlatMap(lambda x: x) + | "Write to BigQuery" >> io.WriteToBigQuery(project=self.project, + dataset=self.output_dataset, + table=self.output_table, + schema=self.create_output_schema()) + ) + + def create_output_schema(self): + table_schema = bigquery.TableSchema() + + for column, data_type in zip(self.data_columns, self.data_types): + field_schema = bigquery.TableFieldSchema() + field_schema.name = column + field_schema.type = data_type + field_schema.mode = 'nullable' + table_schema.fields.append(field_schema) + + return table_schema diff --git a/code_search/preprocess/preprocess/tokenizer.py b/code_search/preprocess/preprocess/tokenizer.py new file mode 100644 index 00000000..44404bba --- /dev/null +++ b/code_search/preprocess/preprocess/tokenizer.py @@ -0,0 +1,38 @@ +import ast +import astor +import spacy +from nltk.tokenize import RegexpTokenizer + + +def tokenize_docstring(text): + """Apply tokenization using spacy to docstrings.""" + en = spacy.load('en') + tokens = en.tokenizer(text.decode('utf8')) + return [token.text.lower() for token in tokens if not token.is_space] + + +def tokenize_code(text): + """A very basic procedure for tokenizing code strings.""" + return RegexpTokenizer(r'\w+').tokenize(text) + + +def get_function_docstring_pairs(blob): + """Extract (function/method, docstring) pairs from a given code blob.""" + pairs = [] + try: + module = ast.parse(blob) + classes = [node for node in module.body if isinstance(node, ast.ClassDef)] + functions = [node for node in module.body if isinstance(node, ast.FunctionDef)] + for _class in classes: + functions.extend([node for node in _class.body if isinstance(node, ast.FunctionDef)]) + + for f in functions: + source = astor.to_source(f) + docstring = ast.get_docstring(f) if ast.get_docstring(f) else '' + func = source.replace(ast.get_docstring(f, clean=False), '') if docstring else source + + pairs.append((f.name, f.lineno, source, ' '.join(tokenize_code(func)), + ' '.join(tokenize_docstring(docstring.split('\n\n')[0])))) + except (AssertionError, MemoryError, SyntaxError, UnicodeEncodeError): + pass + return pairs diff --git a/code_search/preprocess/requirements.txt b/code_search/preprocess/requirements.txt new file mode 100644 index 00000000..953bdbb0 --- /dev/null +++ b/code_search/preprocess/requirements.txt @@ -0,0 +1,4 @@ +astor~=0.6.0 +apache-beam[gcp]~=2.4.0 +nltk~=3.3.0 +spacy~=2.0.0 diff --git a/code_search/preprocess/scripts/process_github_archive.py b/code_search/preprocess/scripts/process_github_archive.py new file mode 100644 index 00000000..733c7743 --- /dev/null +++ b/code_search/preprocess/scripts/process_github_archive.py @@ -0,0 +1,36 @@ +from __future__ import print_function +import argparse +import apache_beam as beam + +from preprocess.pipeline import create_pipeline_opts, BigQueryGithubFiles + + +def parse_arguments(args): + parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter) + parser.add_argument('-i', '--input', metavar='', help='Path to BigQuery SQL script') + parser.add_argument('-o', '--output', metavar='', + help='Output string of the format :') + parser.add_argument('-p', '--project', metavar='', default='Project', help='Project ID') + parser.add_argument('-j', '--job-name', metavar='', default='Beam Job', help='Job name') + parser.add_argument('--storage-bucket', metavar='', default='gs://bucket', + help='Path to Google Storage Bucket') + + parsed_args = parser.parse_args(args) + return parsed_args + + +def main(args): + args = parse_arguments(args) + pipeline_opts = create_pipeline_opts(args) + + with open(args.input, 'r') as f: + query_string = f.read() + + pipeline = beam.Pipeline(options=pipeline_opts) + (pipeline | BigQueryGithubFiles(args.project, query_string, args.output)) #pylint: disable=expression-not-assigned + pipeline.run() + + +if __name__ == '__main__': + import sys + main(sys.argv[1:]) diff --git a/code_search/preprocess/setup.py b/code_search/preprocess/setup.py new file mode 100644 index 00000000..a54f35e4 --- /dev/null +++ b/code_search/preprocess/setup.py @@ -0,0 +1,60 @@ +from __future__ import print_function +import subprocess +from distutils.command.build import build as distutils_build #pylint: disable=no-name-in-module +from setuptools import setup, find_packages, Command as SetupToolsCommand + + +with open('requirements.txt', 'r') as f: + install_requires = f.readlines() + +with open('README.md') as f: + README = f.read() + +VERSION = '0.1.0' +CUSTOM_COMMANDS = [ + ['python', '-m', 'spacy', 'download', 'en'] +] + + +class Build(distutils_build): + sub_commands = distutils_build.sub_commands + [('CustomCommands', None)] + + +class CustomCommands(SetupToolsCommand): + def initialize_options(self): + pass + + def finalize_options(self): + pass + + @staticmethod + def run_custom_command(command_list): + print('Running command: %s' % command_list) + p = subprocess.Popen(command_list, stdin=subprocess.PIPE, + stdout=subprocess.PIPE, stderr=subprocess.STDOUT) + stdout_data, _ = p.communicate() + print('Command output: %s' % stdout_data) + if p.returncode != 0: + raise RuntimeError('Command %s failed: exit code: %s' % (command_list, p.returncode)) + + def run(self): + for command in CUSTOM_COMMANDS: + self.run_custom_command(command) + + +setup(name='kubeflow-code-search', + description='Kubeflow Code Search Demo', + long_description=README, + long_description_content_type='text/markdown', + url='https://www.github.com/kubeflow/examples', + author='Sanyam Kapoor', + author_email='sanyamkapoor@google.com', + version=VERSION, + license='MIT', + packages=find_packages(), + install_requires=install_requires, + extras_require={}, + cmdclass={ + 'build': Build, + 'CustomCommands': CustomCommands, + })