{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# GCP Dataflow Component Sample\n", "A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.\n", "\n", "## Intended use\n", "\n", "Use this component to run a Python Beam code to submit a Cloud Dataflow job as a step of a Kubeflow pipeline. \n", "\n", "## Runtime arguments\n", "Name | Description | Optional | Data type| Accepted values | Default |\n", ":--- | :----------| :----------| :----------| :----------| :---------- |\n", "python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | | GCSPath | | |\n", "project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job.| | String | | |\n", "region | The Google Cloud Platform (GCP) region to run the Cloud Dataflow job.| | String | | |\n", "staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. `staging_dir` is passed as the command line arguments (`staging_location` and `temp_location`) of the Beam code. | Yes | GCSPath | | None |\n", "requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | | None |\n", "args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None |\n", "wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | | 30 |\n", "\n", "## Input data schema\n", "\n", "Before you use the component, the following files must be ready in a Cloud Storage bucket:\n", "- A Beam Python code file.\n", "- A `requirements.txt` file which includes a list of dependent packages.\n", "\n", "The Beam Python code should follow the [Beam programming guide](https://beam.apache.org/documentation/programming-guide/) as well as the following additional requirements to be compatible with this component:\n", "- It accepts the command line arguments `--project`, `--region`, `--temp_location`, `--staging_location`, which are [standard Dataflow Runner options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-pipeline-options).\n", "- It enables `info logging` before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling `logging.getLogger().setLevel(logging.INFO)` before any other code.\n", "\n", "\n", "## Output\n", "Name | Description\n", ":--- | :----------\n", "job_id | The id of the Cloud Dataflow job that is created.\n", "\n", "## Cautions & requirements\n", "To use the components, the following requirements must be met:\n", "- Cloud Dataflow API is enabled.\n", "- The component is running under a secret Kubeflow user service account in a Kubeflow Pipeline cluster. For example:\n", "```\n", "component_op(...)\n", "```\n", "The Kubeflow user service account is a member of:\n", "- `roles/dataflow.developer` role of the project.\n", "- `roles/storage.objectViewer` role of the Cloud Storage Objects `python_file_path` and `requirements_file_path`.\n", "- `roles/storage.objectCreator` role of the Cloud Storage Object `staging_dir`. \n", "\n", "## Detailed description\n", "The component does several things during the execution:\n", "- Downloads `python_file_path` and `requirements_file_path` to local files.\n", "- Starts a subprocess to launch the Python program.\n", "- Monitors the logs produced from the subprocess to extract the Cloud Dataflow job information.\n", "- Stores the Cloud Dataflow job information in `staging_dir` so the job can be resumed in case of failure.\n", "- Waits for the job to finish.\n", "\n", "# Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" }, "tags": [ "parameters" ] }, "outputs": [], "source": [ "project = 'Input your PROJECT ID'\n", "region = 'Input GCP region' # For example, 'us-central1'\n", "output = 'Input your GCS bucket name' # No ending slash" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%% md\n" } }, "source": [ "## Install Pipeline SDK" ] }, { "cell_type": "code", "execution_count": 2, "metadata": { "tags": [ "skip-in-test" ] }, "outputs": [], "source": [ "!python3 -m pip install 'kfp>=0.1.31' --quiet" ] }, { "cell_type": "markdown", "metadata": { "pycharm": { "name": "#%%\n" } }, "source": [ "\n", "## Load the component using KFP SDK\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import kfp.deprecated.components as comp\n", "\n", "dataflow_python_op = comp.load_component_from_url(\n", " 'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataflow/launch_python/component.yaml')" ] }, { "cell_type": "code", "execution_count": 11, "metadata": { "tags": [ "skip-in-test" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Help on function Launch Python:\n", "\n", "Launch Python(python_file_path: str, project_id: str, region: str, staging_dir: 'GCSPath' = '', requirements_file_path: 'GCSPath' = '', args: list = '[]', wait_interval: int = '30')\n", " Launch Python\n", " Launch a self-executing beam python file.\n", "\n" ] } ], "source": [ "help(dataflow_python_op)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use the wordcount python sample\n", "In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:" ] }, { "cell_type": "code", "execution_count": 4, "metadata": { "tags": [ "skip-in-test" ] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "#\r\n", "# Licensed to the Apache Software Foundation (ASF) under one or more\r\n", "# contributor license agreements. See the NOTICE file distributed with\r\n", "# this work for additional information regarding copyright ownership.\r\n", "# The ASF licenses this file to You under the Apache License, Version 2.0\r\n", "# (the \"License\"); you may not use this file except in compliance with\r\n", "# the License. You may obtain a copy of the License at\r\n", "#\r\n", "# http://www.apache.org/licenses/LICENSE-2.0\r\n", "#\r\n", "# Unless required by applicable law or agreed to in writing, software\r\n", "# distributed under the License is distributed on an \"AS IS\" BASIS,\r\n", "# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r\n", "# See the License for the specific language governing permissions and\r\n", "# limitations under the License.\r\n", "#\r\n", "\r\n", "\"\"\"A minimalist word-counting workflow that counts words in Shakespeare.\r\n", "\r\n", "This is the first in a series of successively more detailed 'word count'\r\n", "examples.\r\n", "\r\n", "Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for\r\n", "more detailed examples that introduce additional concepts.\r\n", "\r\n", "Concepts:\r\n", "\r\n", "1. Reading data from text files\r\n", "2. Specifying 'inline' transforms\r\n", "3. Counting a PCollection\r\n", "4. Writing data to Cloud Storage as text files\r\n", "\r\n", "To execute this pipeline locally, first edit the code to specify the output\r\n", "location. Output location could be a local file path or an output prefix\r\n", "on GCS. (Only update the output location marked with the first CHANGE comment.)\r\n", "\r\n", "To execute this pipeline remotely, first edit the code to set your project ID,\r\n", "runner type, the staging location, the temp location, and the output location.\r\n", "The specified GCS bucket(s) must already exist. (Update all the places marked\r\n", "with a CHANGE comment.)\r\n", "\r\n", "Then, run the pipeline as described in the README. It will be deployed and run\r\n", "using the Google Cloud Dataflow Service. No args are required to run the\r\n", "pipeline. You can see the results in your output bucket in the GCS browser.\r\n", "\"\"\"\r\n", "\r\n", "from __future__ import absolute_import\r\n", "\r\n", "import argparse\r\n", "import logging\r\n", "import re\r\n", "\r\n", "from past.builtins import unicode\r\n", "\r\n", "import apache_beam as beam\r\n", "from apache_beam.io import ReadFromText\r\n", "from apache_beam.io import WriteToText\r\n", "from apache_beam.options.pipeline_options import PipelineOptions\r\n", "from apache_beam.options.pipeline_options import SetupOptions\r\n", "\r\n", "\r\n", "def run(argv=None):\r\n", " \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\r\n", "\r\n", " parser = argparse.ArgumentParser()\r\n", " parser.add_argument('--input',\r\n", " dest='input',\r\n", " default='gs://dataflow-samples/shakespeare/kinglear.txt',\r\n", " help='Input file to process.')\r\n", " parser.add_argument('--output',\r\n", " dest='output',\r\n", " # CHANGE 1/6: The Google Cloud Storage path is required\r\n", " # for outputting the results.\r\n", " default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',\r\n", " help='Output file to write results to.')\r\n", " known_args, pipeline_args = parser.parse_known_args(argv)\r\n", " # pipeline_args.extend([\r\n", " # # CHANGE 2/6: (OPTIONAL) Change this to DataflowRunner to\r\n", " # # run your pipeline on the Google Cloud Dataflow Service.\r\n", " # '--runner=DirectRunner',\r\n", " # # CHANGE 3/6: Your project ID is required in order to run your pipeline on\r\n", " # # the Google Cloud Dataflow Service.\r\n", " # '--project=SET_YOUR_PROJECT_ID_HERE',\r\n", " # # CHANGE 4/6: A GCP region is required in order to run your pipeline on\r\n", " # # the Google Cloud Dataflow Service.\r\n", " # '--region=SET_GCP_REGION_HERE',\r\n", " # # CHANGE 5/6: Your Google Cloud Storage path is required for staging local\r\n", " # # files.\r\n", " # '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',\r\n", " # # CHANGE 6/6: Your Google Cloud Storage path is required for temporary\r\n", " # # files.\r\n", " # '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',\r\n", " # '--job_name=your-wordcount-job',\r\n", " # ])\r\n", "\r\n", " # We use the save_main_session option because one or more DoFn's in this\r\n", " # workflow rely on global context (e.g., a module imported at module level).\r\n", " pipeline_options = PipelineOptions(pipeline_args)\r\n", " pipeline_options.view_as(SetupOptions).save_main_session = True\r\n", " with beam.Pipeline(options=pipeline_options) as p:\r\n", "\r\n", " # Read the text file[pattern] into a PCollection.\r\n", " lines = p | ReadFromText(known_args.input)\r\n", "\r\n", " # Count the occurrences of each word.\r\n", " counts = (\r\n", " lines\r\n", " | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\\']+', x))\r\n", " .with_output_types(unicode))\r\n", " | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\r\n", " | 'GroupAndSum' >> beam.CombinePerKey(sum))\r\n", "\r\n", " # Format the counts into a PCollection of strings.\r\n", " def format_result(word_count):\r\n", " (word, count) = word_count\r\n", " return '%s: %s' % (word, count)\r\n", "\r\n", " output = counts | 'Format' >> beam.Map(format_result)\r\n", "\r\n", " # Write the output using a \"Write\" transform that has side effects.\r\n", " # pylint: disable=expression-not-assigned\r\n", " output | WriteToText(known_args.output)\r\n", "\r\n", "\r\n", "if __name__ == '__main__':\r\n", " logging.getLogger().setLevel(logging.INFO)\r\n", " run()\r\n" ] } ], "source": [ "!gsutil cat gs://ml-pipeline/sample-pipeline/word-count/wc.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Example pipeline that uses the component" ] }, { "cell_type": "code", "execution_count": 5, "metadata": {}, "outputs": [], "source": [ "import kfp.deprecated as kfp\n", "from kfp.deprecated import dsl, Client\n", "import json\n", "@dsl.pipeline(\n", " name='dataflow-launch-python-pipeline',\n", " description='Dataflow launch python pipeline'\n", ")\n", "def pipeline(\n", " python_file_path = 'gs://ml-pipeline/sample-pipeline/word-count/wc.py',\n", " project_id = project,\n", " region = region,\n", " staging_dir = output,\n", " requirements_file_path = 'gs://ml-pipeline/sample-pipeline/word-count/requirements.txt',\n", " wait_interval = 30\n", "):\n", " dataflow_python_op(\n", " python_file_path = python_file_path, \n", " project_id = project_id, \n", " region = region, \n", " staging_dir = staging_dir, \n", " requirements_file_path = requirements_file_path, \n", " args = json.dumps(['--output', f'{staging_dir}/wc/wordcount.out']),\n", " wait_interval = wait_interval)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Submit the pipeline for execution" ] }, { "cell_type": "code", "execution_count": 7, "metadata": { "tags": [ "skip-in-test" ] }, "outputs": [ { "data": { "text/html": [ "Experiment link here" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" }, { "data": { "text/html": [ "Run link here" ], "text/plain": [ "" ] }, "metadata": {}, "output_type": "display_data" } ], "source": [ "Client().create_run_from_pipeline_func(pipeline, arguments={})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Inspect the output" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "skip-in-test" ] }, "outputs": [], "source": [ "!gsutil cat $output/wc/wordcount.out" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## References\n", "* [Component python code](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_python.py)\n", "* [Component docker file](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/Dockerfile)\n", "* [Sample notebook](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/dataflow/launch_python/sample.ipynb)\n", "* [Dataflow Python Quickstart](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python)" ] } ], "metadata": { "interpreter": { "hash": "c7a91a0fef823c7f839350126c5e355ea393d05f89cb40a046ebac9c8851a521" }, "kernelspec": { "display_name": "Python 3.7.10 64-bit ('v2': conda)", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "pycharm": { "stem_cell": { "cell_type": "raw", "metadata": { "collapsed": false }, "source": [] } } }, "nbformat": 4, "nbformat_minor": 2 }