463 lines
17 KiB
Plaintext
463 lines
17 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# GCP Dataflow Component Sample\n",
|
|
"A Kubeflow Pipeline component that prepares data by submitting an Apache Beam job (authored in Python) to Cloud Dataflow for execution. The Python Beam code is run with Cloud Dataflow Runner.\n",
|
|
"\n",
|
|
"## Intended use\n",
|
|
"\n",
|
|
"Use this component to run a Python Beam code to submit a Cloud Dataflow job as a step of a Kubeflow pipeline. \n",
|
|
"\n",
|
|
"## Runtime arguments\n",
|
|
"Name | Description | Optional | Data type| Accepted values | Default |\n",
|
|
":--- | :----------| :----------| :----------| :----------| :---------- |\n",
|
|
"python_file_path | The path to the Cloud Storage bucket or local directory containing the Python file to be run. | | GCSPath | | |\n",
|
|
"project_id | The ID of the Google Cloud Platform (GCP) project containing the Cloud Dataflow job.| | String | | |\n",
|
|
"region | The Google Cloud Platform (GCP) region to run the Cloud Dataflow job.| | String | | |\n",
|
|
"staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information.This is done so that you can resume the job in case of failure. `staging_dir` is passed as the command line arguments (`staging_location` and `temp_location`) of the Beam code. | Yes | GCSPath | | None |\n",
|
|
"requirements_file_path | The path to the Cloud Storage bucket or local directory containing the pip requirements file. | Yes | GCSPath | | None |\n",
|
|
"args | The list of arguments to pass to the Python file. | No | List | A list of string arguments | None |\n",
|
|
"wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | | 30 |\n",
|
|
"\n",
|
|
"## Input data schema\n",
|
|
"\n",
|
|
"Before you use the component, the following files must be ready in a Cloud Storage bucket:\n",
|
|
"- A Beam Python code file.\n",
|
|
"- A `requirements.txt` file which includes a list of dependent packages.\n",
|
|
"\n",
|
|
"The Beam Python code should follow the [Beam programming guide](https://beam.apache.org/documentation/programming-guide/) as well as the following additional requirements to be compatible with this component:\n",
|
|
"- It accepts the command line arguments `--project`, `--region`, `--temp_location`, `--staging_location`, which are [standard Dataflow Runner options](https://cloud.google.com/dataflow/docs/guides/specifying-exec-params#setting-other-cloud-pipeline-options).\n",
|
|
"- It enables `info logging` before the start of a Cloud Dataflow job in the Python code. This is important to allow the component to track the status and ID of the job that is created. For example, calling `logging.getLogger().setLevel(logging.INFO)` before any other code.\n",
|
|
"\n",
|
|
"\n",
|
|
"## Output\n",
|
|
"Name | Description\n",
|
|
":--- | :----------\n",
|
|
"job_id | The id of the Cloud Dataflow job that is created.\n",
|
|
"\n",
|
|
"## Cautions & requirements\n",
|
|
"To use the components, the following requirements must be met:\n",
|
|
"- Cloud Dataflow API is enabled.\n",
|
|
"- The component is running under a secret Kubeflow user service account in a Kubeflow Pipeline cluster. For example:\n",
|
|
"```\n",
|
|
"component_op(...)\n",
|
|
"```\n",
|
|
"The Kubeflow user service account is a member of:\n",
|
|
"- `roles/dataflow.developer` role of the project.\n",
|
|
"- `roles/storage.objectViewer` role of the Cloud Storage Objects `python_file_path` and `requirements_file_path`.\n",
|
|
"- `roles/storage.objectCreator` role of the Cloud Storage Object `staging_dir`. \n",
|
|
"\n",
|
|
"## Detailed description\n",
|
|
"The component does several things during the execution:\n",
|
|
"- Downloads `python_file_path` and `requirements_file_path` to local files.\n",
|
|
"- Starts a subprocess to launch the Python program.\n",
|
|
"- Monitors the logs produced from the subprocess to extract the Cloud Dataflow job information.\n",
|
|
"- Stores the Cloud Dataflow job information in `staging_dir` so the job can be resumed in case of failure.\n",
|
|
"- Waits for the job to finish.\n",
|
|
"\n",
|
|
"# Setup"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"pycharm": {
|
|
"name": "#%%\n"
|
|
},
|
|
"tags": [
|
|
"parameters"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"project = 'Input your PROJECT ID'\n",
|
|
"region = 'Input GCP region' # For example, 'us-central1'\n",
|
|
"output = 'Input your GCS bucket name' # No ending slash"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {
|
|
"pycharm": {
|
|
"name": "#%% md\n"
|
|
}
|
|
},
|
|
"source": [
|
|
"## Install Pipeline SDK"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 2,
|
|
"metadata": {
|
|
"tags": [
|
|
"skip-in-test"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"!python3 -m pip install 'kfp>=0.1.31' --quiet"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {
|
|
"pycharm": {
|
|
"name": "#%%\n"
|
|
}
|
|
},
|
|
"source": [
|
|
"\n",
|
|
"## Load the component using KFP SDK\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"pycharm": {
|
|
"name": "#%%\n"
|
|
}
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"import kfp.deprecated.components as comp\n",
|
|
"\n",
|
|
"dataflow_python_op = comp.load_component_from_url(\n",
|
|
" 'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataflow/launch_python/component.yaml')"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 11,
|
|
"metadata": {
|
|
"tags": [
|
|
"skip-in-test"
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"Help on function Launch Python:\n",
|
|
"\n",
|
|
"Launch Python(python_file_path: str, project_id: str, region: str, staging_dir: 'GCSPath' = '', requirements_file_path: 'GCSPath' = '', args: list = '[]', wait_interval: int = '30')\n",
|
|
" Launch Python\n",
|
|
" Launch a self-executing beam python file.\n",
|
|
"\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"help(dataflow_python_op)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Use the wordcount python sample\n",
|
|
"In this sample, we run a wordcount sample code in a Kubeflow Pipeline. The output will be stored in a Cloud Storage bucket. Here is the sample code:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 4,
|
|
"metadata": {
|
|
"tags": [
|
|
"skip-in-test"
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"name": "stdout",
|
|
"output_type": "stream",
|
|
"text": [
|
|
"#\r\n",
|
|
"# Licensed to the Apache Software Foundation (ASF) under one or more\r\n",
|
|
"# contributor license agreements. See the NOTICE file distributed with\r\n",
|
|
"# this work for additional information regarding copyright ownership.\r\n",
|
|
"# The ASF licenses this file to You under the Apache License, Version 2.0\r\n",
|
|
"# (the \"License\"); you may not use this file except in compliance with\r\n",
|
|
"# the License. You may obtain a copy of the License at\r\n",
|
|
"#\r\n",
|
|
"# http://www.apache.org/licenses/LICENSE-2.0\r\n",
|
|
"#\r\n",
|
|
"# Unless required by applicable law or agreed to in writing, software\r\n",
|
|
"# distributed under the License is distributed on an \"AS IS\" BASIS,\r\n",
|
|
"# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.\r\n",
|
|
"# See the License for the specific language governing permissions and\r\n",
|
|
"# limitations under the License.\r\n",
|
|
"#\r\n",
|
|
"\r\n",
|
|
"\"\"\"A minimalist word-counting workflow that counts words in Shakespeare.\r\n",
|
|
"\r\n",
|
|
"This is the first in a series of successively more detailed 'word count'\r\n",
|
|
"examples.\r\n",
|
|
"\r\n",
|
|
"Next, see the wordcount pipeline, then the wordcount_debugging pipeline, for\r\n",
|
|
"more detailed examples that introduce additional concepts.\r\n",
|
|
"\r\n",
|
|
"Concepts:\r\n",
|
|
"\r\n",
|
|
"1. Reading data from text files\r\n",
|
|
"2. Specifying 'inline' transforms\r\n",
|
|
"3. Counting a PCollection\r\n",
|
|
"4. Writing data to Cloud Storage as text files\r\n",
|
|
"\r\n",
|
|
"To execute this pipeline locally, first edit the code to specify the output\r\n",
|
|
"location. Output location could be a local file path or an output prefix\r\n",
|
|
"on GCS. (Only update the output location marked with the first CHANGE comment.)\r\n",
|
|
"\r\n",
|
|
"To execute this pipeline remotely, first edit the code to set your project ID,\r\n",
|
|
"runner type, the staging location, the temp location, and the output location.\r\n",
|
|
"The specified GCS bucket(s) must already exist. (Update all the places marked\r\n",
|
|
"with a CHANGE comment.)\r\n",
|
|
"\r\n",
|
|
"Then, run the pipeline as described in the README. It will be deployed and run\r\n",
|
|
"using the Google Cloud Dataflow Service. No args are required to run the\r\n",
|
|
"pipeline. You can see the results in your output bucket in the GCS browser.\r\n",
|
|
"\"\"\"\r\n",
|
|
"\r\n",
|
|
"from __future__ import absolute_import\r\n",
|
|
"\r\n",
|
|
"import argparse\r\n",
|
|
"import logging\r\n",
|
|
"import re\r\n",
|
|
"\r\n",
|
|
"from past.builtins import unicode\r\n",
|
|
"\r\n",
|
|
"import apache_beam as beam\r\n",
|
|
"from apache_beam.io import ReadFromText\r\n",
|
|
"from apache_beam.io import WriteToText\r\n",
|
|
"from apache_beam.options.pipeline_options import PipelineOptions\r\n",
|
|
"from apache_beam.options.pipeline_options import SetupOptions\r\n",
|
|
"\r\n",
|
|
"\r\n",
|
|
"def run(argv=None):\r\n",
|
|
" \"\"\"Main entry point; defines and runs the wordcount pipeline.\"\"\"\r\n",
|
|
"\r\n",
|
|
" parser = argparse.ArgumentParser()\r\n",
|
|
" parser.add_argument('--input',\r\n",
|
|
" dest='input',\r\n",
|
|
" default='gs://dataflow-samples/shakespeare/kinglear.txt',\r\n",
|
|
" help='Input file to process.')\r\n",
|
|
" parser.add_argument('--output',\r\n",
|
|
" dest='output',\r\n",
|
|
" # CHANGE 1/6: The Google Cloud Storage path is required\r\n",
|
|
" # for outputting the results.\r\n",
|
|
" default='gs://YOUR_OUTPUT_BUCKET/AND_OUTPUT_PREFIX',\r\n",
|
|
" help='Output file to write results to.')\r\n",
|
|
" known_args, pipeline_args = parser.parse_known_args(argv)\r\n",
|
|
" # pipeline_args.extend([\r\n",
|
|
" # # CHANGE 2/6: (OPTIONAL) Change this to DataflowRunner to\r\n",
|
|
" # # run your pipeline on the Google Cloud Dataflow Service.\r\n",
|
|
" # '--runner=DirectRunner',\r\n",
|
|
" # # CHANGE 3/6: Your project ID is required in order to run your pipeline on\r\n",
|
|
" # # the Google Cloud Dataflow Service.\r\n",
|
|
" # '--project=SET_YOUR_PROJECT_ID_HERE',\r\n",
|
|
" # # CHANGE 4/6: A GCP region is required in order to run your pipeline on\r\n",
|
|
" # # the Google Cloud Dataflow Service.\r\n",
|
|
" # '--region=SET_GCP_REGION_HERE',\r\n",
|
|
" # # CHANGE 5/6: Your Google Cloud Storage path is required for staging local\r\n",
|
|
" # # files.\r\n",
|
|
" # '--staging_location=gs://YOUR_BUCKET_NAME/AND_STAGING_DIRECTORY',\r\n",
|
|
" # # CHANGE 6/6: Your Google Cloud Storage path is required for temporary\r\n",
|
|
" # # files.\r\n",
|
|
" # '--temp_location=gs://YOUR_BUCKET_NAME/AND_TEMP_DIRECTORY',\r\n",
|
|
" # '--job_name=your-wordcount-job',\r\n",
|
|
" # ])\r\n",
|
|
"\r\n",
|
|
" # We use the save_main_session option because one or more DoFn's in this\r\n",
|
|
" # workflow rely on global context (e.g., a module imported at module level).\r\n",
|
|
" pipeline_options = PipelineOptions(pipeline_args)\r\n",
|
|
" pipeline_options.view_as(SetupOptions).save_main_session = True\r\n",
|
|
" with beam.Pipeline(options=pipeline_options) as p:\r\n",
|
|
"\r\n",
|
|
" # Read the text file[pattern] into a PCollection.\r\n",
|
|
" lines = p | ReadFromText(known_args.input)\r\n",
|
|
"\r\n",
|
|
" # Count the occurrences of each word.\r\n",
|
|
" counts = (\r\n",
|
|
" lines\r\n",
|
|
" | 'Split' >> (beam.FlatMap(lambda x: re.findall(r'[A-Za-z\\']+', x))\r\n",
|
|
" .with_output_types(unicode))\r\n",
|
|
" | 'PairWithOne' >> beam.Map(lambda x: (x, 1))\r\n",
|
|
" | 'GroupAndSum' >> beam.CombinePerKey(sum))\r\n",
|
|
"\r\n",
|
|
" # Format the counts into a PCollection of strings.\r\n",
|
|
" def format_result(word_count):\r\n",
|
|
" (word, count) = word_count\r\n",
|
|
" return '%s: %s' % (word, count)\r\n",
|
|
"\r\n",
|
|
" output = counts | 'Format' >> beam.Map(format_result)\r\n",
|
|
"\r\n",
|
|
" # Write the output using a \"Write\" transform that has side effects.\r\n",
|
|
" # pylint: disable=expression-not-assigned\r\n",
|
|
" output | WriteToText(known_args.output)\r\n",
|
|
"\r\n",
|
|
"\r\n",
|
|
"if __name__ == '__main__':\r\n",
|
|
" logging.getLogger().setLevel(logging.INFO)\r\n",
|
|
" run()\r\n"
|
|
]
|
|
}
|
|
],
|
|
"source": [
|
|
"!gsutil cat gs://ml-pipeline/sample-pipeline/word-count/wc.py"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Example pipeline that uses the component"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 5,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import kfp.deprecated as kfp\n",
|
|
"from kfp.deprecated import dsl, Client\n",
|
|
"import json\n",
|
|
"@dsl.pipeline(\n",
|
|
" name='dataflow-launch-python-pipeline',\n",
|
|
" description='Dataflow launch python pipeline'\n",
|
|
")\n",
|
|
"def pipeline(\n",
|
|
" python_file_path = 'gs://ml-pipeline/sample-pipeline/word-count/wc.py',\n",
|
|
" project_id = project,\n",
|
|
" region = region,\n",
|
|
" staging_dir = output,\n",
|
|
" requirements_file_path = 'gs://ml-pipeline/sample-pipeline/word-count/requirements.txt',\n",
|
|
" wait_interval = 30\n",
|
|
"):\n",
|
|
" dataflow_python_op(\n",
|
|
" python_file_path = python_file_path, \n",
|
|
" project_id = project_id, \n",
|
|
" region = region, \n",
|
|
" staging_dir = staging_dir, \n",
|
|
" requirements_file_path = requirements_file_path, \n",
|
|
" args = json.dumps(['--output', f'{staging_dir}/wc/wordcount.out']),\n",
|
|
" wait_interval = wait_interval)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## Submit the pipeline for execution"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": 7,
|
|
"metadata": {
|
|
"tags": [
|
|
"skip-in-test"
|
|
]
|
|
},
|
|
"outputs": [
|
|
{
|
|
"data": {
|
|
"text/html": [
|
|
"Experiment link <a href=\"/pipeline/#/experiments/details/e537d6a6-f49e-4e20-a78f-e33fdafda05c\" target=\"_blank\" >here</a>"
|
|
],
|
|
"text/plain": [
|
|
"<IPython.core.display.HTML object>"
|
|
]
|
|
},
|
|
"metadata": {},
|
|
"output_type": "display_data"
|
|
},
|
|
{
|
|
"data": {
|
|
"text/html": [
|
|
"Run link <a href=\"/pipeline/#/runs/details/0ca2875e-d580-11e9-a9f0-42010a8001f3\" target=\"_blank\" >here</a>"
|
|
],
|
|
"text/plain": [
|
|
"<IPython.core.display.HTML object>"
|
|
]
|
|
},
|
|
"metadata": {},
|
|
"output_type": "display_data"
|
|
}
|
|
],
|
|
"source": [
|
|
"Client().create_run_from_pipeline_func(pipeline, arguments={})"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Inspect the output"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"tags": [
|
|
"skip-in-test"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"!gsutil cat $output/wc/wordcount.out"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## References\n",
|
|
"* [Component python code](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_python.py)\n",
|
|
"* [Component docker file](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/container/Dockerfile)\n",
|
|
"* [Sample notebook](https://github.com/kubeflow/pipelines/blob/release-1.7/components/gcp/dataflow/launch_python/sample.ipynb)\n",
|
|
"* [Dataflow Python Quickstart](https://cloud.google.com/dataflow/docs/quickstarts/quickstart-python)"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"interpreter": {
|
|
"hash": "c7a91a0fef823c7f839350126c5e355ea393d05f89cb40a046ebac9c8851a521"
|
|
},
|
|
"kernelspec": {
|
|
"display_name": "Python 3.7.10 64-bit ('v2': conda)",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.7.10"
|
|
},
|
|
"pycharm": {
|
|
"stem_cell": {
|
|
"cell_type": "raw",
|
|
"metadata": {
|
|
"collapsed": false
|
|
},
|
|
"source": []
|
|
}
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
}
|