Add num workers options to Dataflow (#125)

This commit is contained in:
Sanyam Kapoor 2018-06-05 17:05:56 -07:00 committed by k8s-ci-robot
parent bb7451c0ba
commit 17dd02b803
5 changed files with 18 additions and 11 deletions

View File

@ -40,8 +40,9 @@ $ python setup.py build install
Submit a `Dataflow` job using the following command
```
$ python scripts/process_github_archive.py -i files/select_github_archive.sql -o code_search:function_docstrings -p kubeflow-dev
-j process-github-archive --storage-bucket gs://kubeflow-dev
$ python scripts/process_github_archive.py -i files/select_github_archive.sql -o code_search:function_docstrings \
-p kubeflow-dev -j process-github-archive --storage-bucket gs://kubeflow-dev \
--machine-type n1-highcpu-32 --num-workers 16 --max-num-workers 16
```
**NOTE**: Make sure the Project and Google Storage Bucket is created.

View File

@ -37,6 +37,3 @@ WHERE
REGEXP_CONTAINS(c.content, r'def ') --contains function definition
GROUP BY
c.content
-- for development purposes only
LIMIT
1000000

View File

@ -2,7 +2,7 @@ import os
import apache_beam as beam
import apache_beam.io as io
from apache_beam.options.pipeline_options import StandardOptions, PipelineOptions, \
GoogleCloudOptions, SetupOptions
GoogleCloudOptions, SetupOptions, WorkerOptions
from apache_beam.io.gcp.internal.clients import bigquery
@ -17,6 +17,10 @@ def create_pipeline_opts(args):
google_cloud_options.temp_location = '{}/{}/temp'.format(args.storage_bucket, args.job_name)
google_cloud_options.staging_location = '{}/{}/staging'.format(args.storage_bucket, args.job_name)
options.view_as(WorkerOptions).num_workers = args.num_workers
options.view_as(WorkerOptions).max_num_workers = args.max_num_workers
options.view_as(WorkerOptions).machine_type = args.machine_type
# Point to `setup.py` to allow Dataflow runner to install the package
options.view_as(SetupOptions).setup_file = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'setup.py')

View File

@ -7,13 +7,18 @@ from preprocess.pipeline import create_pipeline_opts, BigQueryGithubFiles
def parse_arguments(args):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument('-i', '--input', metavar='', help='Path to BigQuery SQL script')
parser.add_argument('-o', '--output', metavar='',
parser.add_argument('-i', '--input', metavar='', type=str, help='Path to BigQuery SQL script')
parser.add_argument('-o', '--output', metavar='', type=str,
help='Output string of the format <dataset>:<table>')
parser.add_argument('-p', '--project', metavar='', default='Project', help='Project ID')
parser.add_argument('-j', '--job-name', metavar='', default='Beam Job', help='Job name')
parser.add_argument('--storage-bucket', metavar='', default='gs://bucket',
parser.add_argument('-p', '--project', metavar='', type=str, default='Project', help='Project ID')
parser.add_argument('-j', '--job-name', metavar='', type=str, default='Beam Job', help='Job name')
parser.add_argument('--storage-bucket', metavar='', type=str, default='gs://bucket',
help='Path to Google Storage Bucket')
parser.add_argument('--num-workers', metavar='', type=int, default=1, help='Number of workers')
parser.add_argument('--max-num-workers', metavar='', type=int, default=1,
help='Maximum number of workers')
parser.add_argument('--machine-type', metavar='', type=str, default='n1-standard-1',
help='Google Cloud Machine Type to use')
parsed_args = parser.parse_args(args)
return parsed_args