280 lines
9.8 KiB
Plaintext
280 lines
9.8 KiB
Plaintext
{
|
|
"cells": [
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"# Name\n",
|
|
"Data preparation by using a template to submit a job to Cloud Dataflow\n",
|
|
"\n",
|
|
"# Labels\n",
|
|
"GCP, Cloud Dataflow, Kubeflow, Pipeline\n",
|
|
"\n",
|
|
"# Summary\n",
|
|
"A Kubeflow Pipeline component to prepare data by using a template to submit a job to Cloud Dataflow.\n",
|
|
"\n",
|
|
"# Details\n",
|
|
"\n",
|
|
"## Intended use\n",
|
|
"Use this component when you have a pre-built Cloud Dataflow template and want to launch it as a step in a Kubeflow Pipeline.\n",
|
|
"\n",
|
|
"## Runtime arguments\n",
|
|
"Argument | Description | Optional | Data type | Accepted values | Default |\n",
|
|
":--- | :---------- | :----------| :----------| :---------- | :----------|\n",
|
|
"project_id | The ID of the Google Cloud Platform (GCP) project to which the job belongs. | No | GCPProjectID | | |\n",
|
|
"gcs_path | The path to a Cloud Storage bucket containing the job creation template. It must be a valid Cloud Storage URL beginning with 'gs://'. | No | GCSPath | | |\n",
|
|
"launch_parameters | The parameters that are required to launch the template. The schema is defined in [LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters). The parameter `jobName` is replaced by a generated name. | Yes | Dict | A JSON object which has the same structure as [LaunchTemplateParameters](https://cloud.google.com/dataflow/docs/reference/rest/v1b3/LaunchTemplateParameters) | None |\n",
|
|
"location | The regional endpoint to which the job request is directed.| Yes | GCPRegion | | None |\n",
|
|
"staging_dir | The path to the Cloud Storage directory where the staging files are stored. A random subdirectory will be created under the staging directory to keep the job information. This is done so that you can resume the job in case of failure.| Yes | GCSPath | | None |\n",
|
|
"validate_only | If True, the request is validated but not executed. | Yes | Boolean | | False |\n",
|
|
"wait_interval | The number of seconds to wait between calls to get the status of the job. | Yes | Integer | | 30 |\n",
|
|
"\n",
|
|
"## Input data schema\n",
|
|
"\n",
|
|
"The input `gcs_path` must contain a valid Cloud Dataflow template. The template can be created by following the instructions in [Creating Templates](https://cloud.google.com/dataflow/docs/guides/templates/creating-templates). You can also use [Google-provided templates](https://cloud.google.com/dataflow/docs/guides/templates/provided-templates).\n",
|
|
"\n",
|
|
"## Output\n",
|
|
"Name | Description\n",
|
|
":--- | :----------\n",
|
|
"job_id | The id of the Cloud Dataflow job that is created.\n",
|
|
"\n",
|
|
"## Caution & requirements\n",
|
|
"\n",
|
|
"To use the component, the following requirements must be met:\n",
|
|
"- Cloud Dataflow API is enabled.\n",
|
|
"- The component can authenticate to GCP. Refer to [Authenticating Pipelines to GCP](https://www.kubeflow.org/docs/gke/authentication-pipelines/) for details.\n",
|
|
"- The Kubeflow user service account is a member of:\n",
|
|
" - `roles/dataflow.developer` role of the project.\n",
|
|
" - `roles/storage.objectViewer` role of the Cloud Storage Object `gcs_path.`\n",
|
|
" - `roles/storage.objectCreator` role of the Cloud Storage Object `staging_dir.` \n",
|
|
"\n",
|
|
"## Detailed description\n",
|
|
"You can execute the template locally by following the instructions in [Executing Templates](https://cloud.google.com/dataflow/docs/guides/templates/executing-templates). See the sample code below to learn how to execute the template.\n",
|
|
"Follow these steps to use the component in a pipeline:\n",
|
|
"1. Install the Kubeflow Pipeline SDK:\n"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"%%capture --no-stderr\n",
|
|
"\n",
|
|
"!pip3 install kfp --upgrade"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"2. Load the component using KFP SDK"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import kfp.components as comp\n",
|
|
"\n",
|
|
"dataflow_template_op = comp.load_component_from_url(\n",
|
|
" 'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/dataflow/launch_template/component.yaml')\n",
|
|
"help(dataflow_template_op)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"### Sample\n",
|
|
"\n",
|
|
"Note: The following sample code works in an IPython notebook or directly in Python code.\n",
|
|
"In this sample, we run a Google-provided word count template from `gs://dataflow-templates/latest/Word_Count`. The template takes a text file as input and outputs word counts to a Cloud Storage bucket. Here is the sample input:"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"!gsutil cat gs://dataflow-samples/shakespeare/kinglear.txt"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Set sample parameters"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {
|
|
"tags": [
|
|
"parameters"
|
|
]
|
|
},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Required Parameters\n",
|
|
"PROJECT_ID = '<Please put your project ID here>'\n",
|
|
"GCS_WORKING_DIR = 'gs://<Please put your GCS path here>' # No ending slash"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"# Optional Parameters\n",
|
|
"EXPERIMENT_NAME = 'Dataflow - Launch Template'\n",
|
|
"OUTPUT_PATH = '{}/out/wc'.format(GCS_WORKING_DIR)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Example pipeline that uses the component"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"import kfp.dsl as dsl\n",
|
|
"import json\n",
|
|
"@dsl.pipeline(\n",
|
|
" name='Dataflow launch template pipeline',\n",
|
|
" description='Dataflow launch template pipeline'\n",
|
|
")\n",
|
|
"def pipeline(\n",
|
|
" project_id = PROJECT_ID, \n",
|
|
" gcs_path = 'gs://dataflow-templates/latest/Word_Count', \n",
|
|
" launch_parameters = json.dumps({\n",
|
|
" 'parameters': {\n",
|
|
" 'inputFile': 'gs://dataflow-samples/shakespeare/kinglear.txt',\n",
|
|
" 'output': OUTPUT_PATH\n",
|
|
" }\n",
|
|
" }), \n",
|
|
" location = '',\n",
|
|
" validate_only = 'False', \n",
|
|
" staging_dir = GCS_WORKING_DIR,\n",
|
|
" wait_interval = 30):\n",
|
|
" dataflow_template_op(\n",
|
|
" project_id = project_id, \n",
|
|
" gcs_path = gcs_path, \n",
|
|
" launch_parameters = launch_parameters, \n",
|
|
" location = location, \n",
|
|
" validate_only = validate_only,\n",
|
|
" staging_dir = staging_dir,\n",
|
|
" wait_interval = wait_interval)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Compile the pipeline"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"pipeline_func = pipeline\n",
|
|
"pipeline_filename = pipeline_func.__name__ + '.zip'\n",
|
|
"import kfp.compiler as compiler\n",
|
|
"compiler.Compiler().compile(pipeline_func, pipeline_filename)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Submit the pipeline for execution"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"#Specify pipeline argument values\n",
|
|
"arguments = {}\n",
|
|
"\n",
|
|
"#Get or create an experiment and submit a pipeline run\n",
|
|
"import kfp\n",
|
|
"client = kfp.Client()\n",
|
|
"experiment = client.create_experiment(EXPERIMENT_NAME)\n",
|
|
"\n",
|
|
"#Submit a pipeline run\n",
|
|
"run_name = pipeline_func.__name__ + ' run'\n",
|
|
"run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"#### Inspect the output"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "code",
|
|
"execution_count": null,
|
|
"metadata": {},
|
|
"outputs": [],
|
|
"source": [
|
|
"!gsutil cat $OUTPUT_PATH*"
|
|
]
|
|
},
|
|
{
|
|
"cell_type": "markdown",
|
|
"metadata": {},
|
|
"source": [
|
|
"## References\n",
|
|
"\n",
|
|
"* [Component python code](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/component_sdk/python/kfp_component/google/dataflow/_launch_template.py)\n",
|
|
"* [Component docker file](https://github.com/kubeflow/pipelines/blob/master/components/gcp/container/Dockerfile)\n",
|
|
"* [Sample notebook](https://github.com/kubeflow/pipelines/blob/master/components/gcp/dataflow/launch_template/sample.ipynb)\n",
|
|
"* [Cloud Dataflow Templates overview](https://cloud.google.com/dataflow/docs/guides/templates/overview)\n",
|
|
"\n",
|
|
"## License\n",
|
|
"By deploying or using this software you agree to comply with the [AI Hub Terms of Service](https://aihub.cloud.google.com/u/0/aihub-tos) and the [Google APIs Terms of Service](https://developers.google.com/terms/). To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.\n"
|
|
]
|
|
}
|
|
],
|
|
"metadata": {
|
|
"kernelspec": {
|
|
"display_name": "Python 3",
|
|
"language": "python",
|
|
"name": "python3"
|
|
},
|
|
"language_info": {
|
|
"codemirror_mode": {
|
|
"name": "ipython",
|
|
"version": 3
|
|
},
|
|
"file_extension": ".py",
|
|
"mimetype": "text/x-python",
|
|
"name": "python",
|
|
"nbconvert_exporter": "python",
|
|
"pygments_lexer": "ipython3",
|
|
"version": "3.6.4"
|
|
}
|
|
},
|
|
"nbformat": 4,
|
|
"nbformat_minor": 2
|
|
} |