# GCP Dataflow Component Sample
A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.

## Intended use

Use this component to run a Python Beam code to submit a Cloud Dataflow job as a step of a Kubeflow pipeline. 

## Runtime arguments
Name | Description | Optional |  Data type| Accepted values | Default |
:--- | :----------| :----------| :----------| :----------| :---------- |
python_file_path |  The path to the Cloud Storage bucket or local directory containing the Python file to be run. |  |  GCSPath |  |  |
project_id |  The ID of the Google Cloud Platform (GCP) project  containing the Cloud Dataflow job.| | GCPProjectID | | |
staging_dir  |   The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the  job information.This is done so that you can resume the job in case of failure. `staging_dir` is passed as the command line arguments (`staging_location` and `temp_location`) of the Beam code. |   Yes  |   GCSPath  |   |   None  |
requirements_file_path |   The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath |  | None |
args |  The list of arguments to pass to the Python file. | No |  List | A list of string arguments | None |
wait_interval |  The number of seconds to wait between calls to get the status of the job. | Yes | Integer  |  | 30 |

## Input data schema

Before you use the component, the following files must be ready in a Cloud Storage bucket:
- A Beam Python code file.
- A  `requirements.txt` file which includes a list of dependent packages.

The Beam Python code should follow the [Beam programming guide](https://beam.apache.org/documentation/programming-guide/) as well as the following additional requirements to be compatible with this component:
- It accepts the command line arguments `--project`, `--temp_location`, `--staging_location`, which are [standard Dataflow Runner options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-pipeline-options).
- It enables `info logging` before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling `logging.getLogger().setLevel(logging.INFO)` before any other code.


## Output
Name | Description
:--- | :----------
job_id | The id of the Cloud Dataflow job that is created.

## Cautions & requirements
To use the components, the following requirements must be met:
- Cloud Dataflow API is enabled.
- The component is running under a secret Kubeflow user service account in a Kubeflow Pipeline cluster.  For example:
```
component_op(...).apply(gcp.use_gcp_secret('user-gcp-sa'))
```
The Kubeflow user service account is a member of:
- `roles/dataflow.developer` role of the project.
- `roles/storage.objectViewer` role of the Cloud Storage Objects `python_file_path` and `requirements_file_path`.
- `roles/storage.objectCreator` role of the Cloud Storage Object `staging_dir`. 

## Detailed description
The component does several things during the execution:
- Downloads `python_file_path` and `requirements_file_path` to local files.
- Starts a subprocess to launch the Python program.
- Monitors the logs produced from the subprocess to extract the Cloud Dataflow job information.
- Stores the Cloud Dataflow job information in `staging_dir` so the job can be resumed in case of failure.
- Waits for the job to finish.

# Setup

In [1]:
project = 'Input your PROJECT ID'
output = 'Input your GCS bucket name' # No ending slash
experiment_name = 'Dataflow - Launch Python'

## Install Pipeline SDK

In [2]:
%%capture --no-stderr
!pip3 install kfp --upgrade


## Load the component using KFP SDK


In [3]:
import kfp.components as comp

dataflow_python_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/e598176c02f45371336ccaa819409e8ec83743df/components/gcp/dataflow/launch_python/component.yaml')
help(dataflow_python_op)

Help on function Launch Python:

Launch Python(python_file_path:str, project_id:'GCPProjectID', staging_dir:'GCSPath'='', requirements_file_path:'GCSPath'='', args:list='[]', wait_interval:int='30')
    Launch Python
    Launch a self-executing beam python file.



## Use the wordcount python sample
In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:

In [4]:
!gsutil cat gs://ml-pipeline-playground/samples/dataflow/wc/wc.py

#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

"""A minimalist word-counting workflow that counts words in Shakespeare.

This is the first in a series of successively more detailed 'word count'
examples.

Next, see the wordcount pipeline, t

## Example pipeline that uses the component

In [5]:
import kfp.dsl as dsl
import kfp.gcp as gcp
import json
output_file = '{}/wc/wordcount.out'.format(output)
@dsl.pipeline(
    name='Dataflow launch python pipeline',
    description='Dataflow launch python pipeline'
)
def pipeline(
    python_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/wc.py',
    project_id = project,
    staging_dir = output,
    requirements_file_path = 'gs://ml-pipeline-playground/samples/dataflow/wc/requirements.txt',
    args = json.dumps([
        '--output', output_file
    ]),
    wait_interval = 30
):
    dataflow_python_op(
        python_file_path = python_file_path, 
        project_id = project_id, 
        staging_dir = staging_dir, 
        requirements_file_path = requirements_file_path, 
        args = args,
        wait_interval = wait_interval).apply(gcp.use_gcp_secret('user-gcp-sa'))

## Compile the pipeline

In [6]:
pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

## Submit the pipeline for execution

In [7]:
#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(experiment_name)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

#### Inspect the output

In [None]:
!gsutil cat $output_file

## References
* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_python.py)
* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)
* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataflow/launch_python/sample.ipynb)
* [Dataflow Python Quickstart](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python)