pipelines/components/gcp/bigquery/query/to_table
Yuan (Bob) Gong 852e7f52b2
fix(components/gcp): add explicit command for gcp components. Fixes #6252 (#6352)
2021-08-16 23:43:41 -07:00
..
README.md chore(release): bumped version to 1.7.0-rc.3 2021-08-06 07:13:16 +00:00
component.yaml fix(components/gcp): add explicit command for gcp components. Fixes #6252 (#6352) 2021-08-16 23:43:41 -07:00

README.md

Name

Gather data by querying BigQuery and save it to a table in BigQuery.

Labels

GCP, BigQuery, Kubeflow, Pipeline

Summary

A Kubeflow Pipeline component to submit a query to BigQuery and store the result in a table in BigQuery.

Details

Intended use

Use this Kubeflow component to:

  • Select data by submitting a query to BigQuery.
  • Output the data into a table in BigQuery.

Runtime arguments:

Argument Description Optional Data type Accepted values Default
query The query used by BigQuery to fetch the results. No String
project_id The project ID of the Google Cloud Platform (GCP) project to use to execute the query. No GCPProjectID
dataset_id The ID of the persistent BigQuery dataset to store the results of the query. If the dataset does not exist, the operation will create a new one. Yes String None
table_id The ID of the BigQuery table to store the results of the query. If the table ID is absent, the operation will generate a random ID for the table. Yes String None
dataset_location The location where the dataset is created. Defaults to US. Yes String US
job_config The full configuration specification for the query job. See QueryJobConfig for details. Yes Dict A JSONobject which has the same structure as QueryJobConfig None

Input data schema

The input data is a BigQuery job containing a query that pulls data from various sources.

Output:

Cautions & requirements

To use the component, the following requirements must be met:

  • The BigQuery API is enabled.
  • The component can authenticate to GCP. Refer to Authenticating Pipelines to GCP for details.
  • The Kubeflow user service account is a member of the roles/bigquery.admin role of the project.
  • The Kubeflow user service account is a member of the roles/storage.objectCreator role of the Cloud Storage output bucket.

Detailed description

This Kubeflow Pipeline component is used to:

  • Submit a query to BigQuery.
    • The query results are persisted in a dataset table in BigQuery.

Sample

Note: The following sample code works in an IPython notebook or directly in Python code.

Set sample parameters

%%capture --no-stderr

!pip3 install kfp --upgrade
  1. Load the component using KFP SDK
import kfp.components as comp

bigquery_query_op = comp.load_component_from_url(
    'https://raw.githubusercontent.com/kubeflow/pipelines/1.7.0-rc.3/components/gcp/bigquery/query/to_table/component.yaml')
help(bigquery_query_op)

Query

In this sample, we send a query to get the top questions from stackdriver public data and write the result to a table.

QUERY = 'SELECT * FROM `bigquery-public-data.stackoverflow.posts_questions` LIMIT 10'

Set sample parameters

# Required Parameters
PROJECT_ID = '<Please put your project ID here>'
# Optional Parameters
EXPERIMENT_NAME = 'Bigquery-Query'
DATASET_ID = "TEST_DATASET"
TABLE_ID = "TEST_TABLE"

Run the component as a single pipeline

import kfp.dsl as dsl
import json
@dsl.pipeline(
    name='Bigquery query pipeline',
    description='Bigquery query pipeline'
)
def pipeline(
    query=QUERY, 
    project_id=PROJECT_ID, 
    dataset_id=DATASET_ID, 
    table_id=TABLE_ID, 
    dataset_location='US', 
    job_config=''
):
    bigquery_query_op(
        query=query, 
        project_id=project_id, 
        dataset_id=dataset_id, 
        table_id=table_id,  
        dataset_location=dataset_location, 
        job_config=job_config)

Compile the pipeline

pipeline_func = pipeline
pipeline_filename = pipeline_func.__name__ + '.zip'
import kfp.compiler as compiler
compiler.Compiler().compile(pipeline_func, pipeline_filename)

Submit the pipeline for execution

#Specify pipeline argument values
arguments = {}

#Get or create an experiment and submit a pipeline run
import kfp
client = kfp.Client()
experiment = client.create_experiment(EXPERIMENT_NAME)

#Submit a pipeline run
run_name = pipeline_func.__name__ + ' run'
run_result = client.run_pipeline(experiment.id, run_name, pipeline_filename, arguments)

Inspect the output

Find the create table under the specified dataset id and table id.

References

License

By deploying or using this software you agree to comply with the AI Hub Terms of Service and the Google APIs Terms of Service. To the extent of a direct conflict of terms, the AI Hub Terms of Service will control.