chore(components): Move Dataflow Flex Template components to preview

PiperOrigin-RevId: 542077307
This commit is contained in:
Googler 2023-06-20 16:08:33 -07:00 committed by Google Cloud Pipeline Components maintainers
parent bc7b65207e
commit a2a359aa9a
7 changed files with 670 additions and 0 deletions

View File

@ -0,0 +1,14 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Pipeline Dataflow components."""

View File

@ -0,0 +1,14 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Pipeline Components - Dataflow Flex Template Launcher and Remote Runner."""

View File

@ -0,0 +1,56 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Dataflow Flex Template launcher."""
import logging
import sys
from google_cloud_pipeline_components.container.preview.dataflow.flex_template import remote_runner
from google_cloud_pipeline_components.container.v1.gcp_launcher.utils import parser_util
def _parse_args(args):
"""Parse command line arguments."""
_, parsed_args = parser_util.parse_default_args(args)
return vars(parsed_args)
def main(argv):
"""Main entry.
Expected input args are as follows:
Project - Required. The project of which the resource will be launched.
Region - Required. The region of which the resource will be launched.
Type - Required. GCP launcher is a single container. This Enum will
specify which resource to be launched.
Request payload - Required. The full serialized json of the resource spec.
Note this can contain the Pipeline Placeholders.
gcp_resources - placeholder output for returning job_id.
Args:
argv: A list of system arguments.
"""
parsed_args = _parse_args(argv)
job_type = parsed_args['type']
if job_type != 'DataflowJob':
raise ValueError('Incorrect job type: ' + job_type)
logging.info('Job started for type: %s', job_type)
remote_runner.launch_flex_template(**parsed_args)
if __name__ == '__main__':
main(sys.argv[1:])

View File

@ -0,0 +1,280 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""GCP launcher for Dataflow Flex Templates."""
import datetime
import json
import logging
import os
from os import path
import re
from typing import Any, Dict, Union
import uuid
import google.auth.transport.requests
from google_cloud_pipeline_components.container.v1.gcp_launcher.utils import gcp_labels_util
from google_cloud_pipeline_components.container.v1.gcp_launcher.utils import json_util
from google_cloud_pipeline_components.proto import gcp_resources_pb2
import requests
from requests.adapters import HTTPAdapter
from requests.sessions import Session
from urllib3.util.retry import Retry
from google.protobuf import json_format
_CONNECTION_ERROR_RETRY_LIMIT = 5
_CONNECTION_RETRY_BACKOFF_FACTOR = 2.0
_DATAFLOW_URI_PREFIX = 'https://dataflow.googleapis.com/v1b3'
_DATAFLOW_JOB_URI_TEMPLATE = rf'({_DATAFLOW_URI_PREFIX}/projects/(?P<project>.*)/locations/(?P<location>.*)/jobs/(?P<job>.*))'
def insert_system_labels_into_payload(payload):
job_spec = json.loads(payload)
try:
labels = job_spec['launch_parameter']['environment'][
'additional_user_labels'
]
except KeyError:
labels = {}
if 'launch_parameter' not in job_spec.keys():
job_spec['launch_parameter'] = {}
if 'environment' not in job_spec['launch_parameter'].keys():
job_spec['launch_parameter']['environment'] = {}
if (
'additional_user_labels'
not in job_spec['launch_parameter']['environment'].keys()
):
job_spec['launch_parameter']['environment']['additional_user_labels'] = {}
labels = gcp_labels_util.attach_system_labels(labels)
job_spec['launch_parameter']['environment']['additional_user_labels'] = labels
return json.dumps(job_spec)
class DataflowFlexTemplateRemoteRunner:
"""Common module for creating Dataproc Flex Template jobs."""
def __init__(
self,
type: str,
project: str,
location: str,
gcp_resources: str,
):
"""Initializes a DataflowFlexTemplateRemoteRunner object."""
self._type = type
self._project = project
self._location = location
self._creds, _ = google.auth.default()
self._gcp_resources = gcp_resources
self._session = self._get_session()
def _get_session(self) -> Session:
"""Gets a http session."""
retry = Retry(
total=_CONNECTION_ERROR_RETRY_LIMIT,
status_forcelist=[429, 503],
backoff_factor=_CONNECTION_RETRY_BACKOFF_FACTOR,
allowed_methods=['GET', 'POST'],
)
adapter = HTTPAdapter(max_retries=retry)
session = requests.Session()
session.headers.update({
'Content-Type': 'application/json',
'User-Agent': 'google-cloud-pipeline-components',
})
session.mount('https://', adapter)
return session
def _post_resource(self, url: str, post_data: str) -> Dict[str, Any]:
"""POST a http request.
Args:
url: The resource url.
post_data: The POST data.
Returns:
Dict of the JSON payload returned in the http response.
Raises:
RuntimeError: Failed to get or parse the http response.
"""
if not self._creds.valid:
self._creds.refresh(google.auth.transport.requests.Request())
headers = {'Authorization': 'Bearer ' + self._creds.token}
result = self._session.post(url=url, data=post_data, headers=headers)
json_data = {}
try:
json_data = result.json()
result.raise_for_status()
return json_data
except requests.exceptions.HTTPError as err:
try:
err_msg = (
'Dataflow service returned HTTP status {} from POST: {}. Status:'
' {}, Message: {}'.format(
err.response.status_code,
err.request.url,
json_data['error']['status'],
json_data['error']['message'],
)
)
except (KeyError, TypeError):
err_msg = err.response.text
# Raise RuntimeError with the error returned from the Dataflow service.
# Suppress HTTPError as it provides no actionable feedback.
raise RuntimeError(err_msg) from None
except json.decoder.JSONDecodeError as err:
raise RuntimeError(
'Failed to decode JSON from response:\n{}'.format(err.doc)
) from err
def check_if_job_exists(self) -> Union[Dict[str, Any], None]:
"""Check if a Dataflow job already exists.
Returns:
Dict of the Job resource if it exists. For more details, see:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job
None if the Job resource does not exist.
Raises:
ValueError: Job resource uri format is invalid.
"""
if (
path.exists(self._gcp_resources)
and os.stat(self._gcp_resources).st_size != 0
):
with open(self._gcp_resources) as f:
serialized_gcp_resources = f.read()
job_resources = json_format.Parse(
serialized_gcp_resources, gcp_resources_pb2.GcpResources()
)
# Resources should only contain one item.
if len(job_resources.resources) != 1:
raise ValueError(
'gcp_resources should contain one resource, found'
f' {len(job_resources.resources)}'
)
# Validate the format of the Job resource uri.
job_name_pattern = re.compile(_DATAFLOW_JOB_URI_TEMPLATE)
match = job_name_pattern.match(job_resources.resources[0].resource_uri)
try:
matched_project = match.group('project')
matched_location = match.group('location')
matched_job_id = match.group('job')
except AttributeError as err:
raise ValueError(
'Invalid Resource uri: {}. Expect: {}.'.format(
job_resources.resources[0].resource_uri,
'https://dataflow.googleapis.com/v1b3/projects/[projectId]/locations/[location]/jobs/[jobId]',
)
) from err
# Return the Job resource uri.
return job_resources.resources[0].resource_uri
return None
def create_job(
self,
type: str,
job_request: Dict[str, Any],
) -> None:
"""Create a job using a Dataflow Flex Template.
Args:
type: Job type that is written to gcp_resources.
job_request: A json serialized Job resource. For more details, see:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job
Raises:
RuntimeError: The Job resource uri cannot be constructed by by parsing the
response.
Returns:
None
"""
launch_job_url = f'https://dataflow.googleapis.com/v1b3/projects/{self._project}/locations/{self._location}/flexTemplates:launch'
response = self._post_resource(launch_job_url, json.dumps(job_request))
if not job_request.get('validate_only', False):
try:
job = response['job']
job_uri = f"{_DATAFLOW_URI_PREFIX}/projects/{job['projectId']}/locations/{job['location']}/jobs/{job['id']}"
except KeyError as err:
raise RuntimeError(
'Dataflow Flex Template launch failed. '
'Cannot determine the job resource uri from the response:\n'
f'{response}'
) from err
# Write the Job resource to the gcp_resources output file.
job_resources = gcp_resources_pb2.GcpResources()
job_resource = job_resources.resources.add()
job_resource.resource_type = type
job_resource.resource_uri = job_uri
with open(self._gcp_resources, 'w') as f:
f.write(json_format.MessageToJson(job_resources))
logging.info('Created Dataflow job: %s', job_uri)
else:
logging.info('No Dataflow job is created for request validation.')
def launch_flex_template(
type: str,
project: str,
location: str,
payload: str,
gcp_resources: str,
) -> None:
"""Main function for launching a Dataflow Flex Template.
Args:
type: Job type that is written to gcp_resources.
project: Project to launch the job.
location: Location to launch the job.
payload: A json serialized Job resource. For more details, see:
https://cloud.google.com/dataflow/docs/reference/rest/v1b3/projects.jobs#Job
gcp_resources: File path for storing `gcp_resources` output parameter.
Returns:
None
"""
try:
job_spec = json_util.recursive_remove_empty(
json.loads(insert_system_labels_into_payload(payload), strict=False)
)
except json.decoder.JSONDecodeError as err:
raise RuntimeError(
'Failed to decode JSON from payload: {}'.format(err.doc)
) from err
remote_runner = DataflowFlexTemplateRemoteRunner(
type, project, location, gcp_resources
)
if not remote_runner.check_if_job_exists():
if 'launch_parameter' not in job_spec.keys():
job_spec['launch_parameter'] = {}
if 'job_name' not in job_spec['launch_parameter'].keys():
now = datetime.datetime.now().strftime('%Y%m%d%H%M%S')
job_spec['launch_parameter']['job_name'] = '-'.join(
[type.lower(), now, uuid.uuid4().hex[:8]]
)
remote_runner.create_job(type, job_spec)

View File

@ -0,0 +1,22 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Experimnental Dataflow components."""
from google_cloud_pipeline_components.preview.dataflow.flex_template.component import dataflow_flex_template as DataflowFlexTemplateJobOp
from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp
__all__ = [
'DataflowFlexTemplateJobOp',
'DataflowPythonJobOp',
]

View File

@ -0,0 +1,14 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Google Cloud Pipelines Dataflow Flex Template Component."""

View File

@ -0,0 +1,270 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import Dict, List
from google_cloud_pipeline_components import _image
from kfp.dsl import ConcatPlaceholder
from kfp.dsl import container_component
from kfp.dsl import ContainerSpec
from kfp.dsl import OutputPath
@container_component
def dataflow_flex_template(
project: str,
container_spec_gcs_path: str,
gcp_resources: OutputPath(str),
location: str = 'us-central1',
job_name: str = '',
parameters: Dict[str, str] = {},
launch_options: Dict[str, str] = {},
num_workers: int = 0,
max_workers: int = 0,
service_account_email: str = '',
temp_location: str = '',
machine_type: str = '',
additional_experiments: List[str] = [],
network: str = '',
subnetwork: str = '',
additional_user_labels: Dict[str, str] = {},
kms_key_name: str = '',
ip_configuration: str = '',
worker_region: str = '',
worker_zone: str = '',
enable_streaming_engine: bool = False,
flexrs_goal: str = '',
staging_location: str = '',
sdk_container_image: str = '',
disk_size_gb: int = 0,
autoscaling_algorithm: str = '',
dump_heap_on_oom: bool = False,
save_heap_dumps_to_gcs_path: str = '',
launcher_machine_type: str = '',
enable_launcher_vm_serial_port_logging: bool = False,
update: bool = False,
transform_name_mappings: Dict[str, str] = {},
validate_only: bool = False,
):
# fmt: off
"""Launch a job with a Dataflow Flex Template.
Args:
project: The ID of the Cloud Platform project that the job
belongs to.
location: The regional endpoint to which to direct the request. E.g., us-central1,
us-west1. Defaults to `us-central1` if not set.
job_name: The job name to use for the created job. For update job requests, the job
name should be the same as the existing running job. If none is specified,
a default name will be generated by the component.
container_spec_gcs_path: Cloud Storage path to a file with json serialized ContainerSpec as
content.
parameters:
The parameters for the flex template. Ex. {"my_template_param":"5"}
launch_options:
Launch options for this flex template job. This is a common set of options
across languages and templates. This should not be used to pass job
parameters.
num_workers: The initial number of Google Compute Engine instances for the job. If
empty or unspecified, the Dataflow service determines an appropriate
number of workers.
max_workers: The maximum number of Google Compute Engine instances to be made available
to your pipeline during execution, from 1 to 1000. If empty or
unspecified, the Dataflow service determines a default maximum number of
instances. For more details, see
https://cloud.google.com/dataflow/docs/horizontal-autoscaling.
service_account_email: The email address of the service account to run the job as. If
unspecified, the Dataflow service uses the project's Compute Engine
default service account.
temp_location: The Cloud Storage path to use for temporary files. Must be a valid Cloud
Storage URL, beginning with gs://. For more details, see
https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.
machine_type: The machine type to use for the Dataflow job. Defaults to the value from
the template if not specified.
additional_experiments: Additional experiment flags for the job.
network: Network to which VMs will be assigned. If empty or unspecified, the
service will use the network "default".
subnetwork: Subnetwork to which VMs will be assigned, if desired. You can specify a
subnetwork using either a complete URL or an abbreviated path.
Expected to be of the form
"https://www.googleapis.com/compute/v1/projects/HOST_PROJECT_ID/regions/REGION/subnetworks/SUBNETWORK"
or "regions/REGION/subnetworks/SUBNETWORK". If the subnetwork is located
in a Shared VPC network, you must use the complete URL.
additional_user_labels:
Additional user labels to be specified for the job. Keys and values must
follow the restrictions specified in the labeling restrictions page
(https://cloud.google.com/compute/docs/labeling-resources#restrictions).
An object containing a list of "key": value pairs.
Example: { "name": "wrench", "mass": "1kg", "count": "3" }.
kms_key_name: Name for the Cloud KMS key for the job. Key format is
"projects/HOST_PROJECT_ID/locations/LOCATION/keyRings/KEYRING_ID/cryptoKeys/CRYPTO_KEY_ID"
ip_configuration: Configuration for VM IPs.
worker_region: The Compute Engine region
(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in
which worker processing should occur, e.g. "us-west1". Mutually exclusive
with worker_zone. If neither worker_region nor worker_zone is specified,
default to the control plane's region.
worker_zone: The Compute Engine zone
(https://cloud.google.com/compute/docs/regions-zones/regions-zones) in
which worker processing should occur, e.g. "us-west1-a". Mutually
exclusive with workerRegion. If neither worker_region nor worker_zone is
specified, a zone in the control plane's region is chosen based on
available capacity.
enable_streaming_engine: Whether to enable Streaming Engine for the job.
flexrs_goal: Set FlexRS goal for the job. For more details, see
https://cloud.google.com/dataflow/docs/guides/flexrs.
staging_location: The Cloud Storage path for staging local files. Must be a valid Cloud
Storage URL, beginning with gs://. For more details, see
https://cloud.google.com/dataflow/docs/guides/setting-pipeline-options#setting_required_options.
sdk_container_image: Docker registry location (e.g. Artifact Registry) of the container image
to use for the worker harness. Default is the container for the version of
the SDK. Note this field is only valid for portable Dataflow pipeline
jobs.
disk_size_gb: Worker disk size, in gigabytes. If empty or unspecified, the Dataflow
service determines an appropriate disk size.
autoscaling_algorithm: The algorithm to use for autoscaling. If empty or unspecified, the
Dataflow service sets a default value. For more details, see
https://cloud.google.com/dataflow/docs/reference/pipeline-options#resource_utilization.
dump_heap_on_oom: If true, when processing time is spent almost entirely on garbage
collection (GC), saves a heap dump before ending the thread or process.
If false, ends the thread or process without saving a heap dump. Does not
save a heap dump when the Java Virtual Machine (JVM) has an out of memory
error during processing. The location of the heap file is either echoed
back to the user, or the user is given the opportunity to download the
heap file.
save_heap_dumps_to_gcs_path: Cloud Storage bucket (directory) to upload heap dumps to. Enabling this
field implies that dump_heap_on_oom is set to true.
launcher_machine_type: The machine type to use for launching the Dataflow job. The default is
n1-standard-1.
enable_launcher_vm_serial_port_logging: If true serial port logging will be enabled for the launcher VM.
update: Set this to true if you are sending a request to update a running
streaming job. When set, the job name should be the same as the running
job.
transform_name_mappings:
Use this to pass transformNameMappings for streaming update jobs.
Ex:{"oldTransformName":"newTransformName",...}'. For more details, see
https://cloud.google.com/dataflow/docs/guides/updating-a-pipeline#Mapping
validate_only: If true, the request is validated but not actually executed. Defaults to
false.
Returns:
gcp_resources: Serialized gcp_resources proto tracking the Dataflow job. For more details, see
https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
"""
# fmt: on
return ContainerSpec(
image=_image.GCPC_IMAGE_TAG,
command=[
'python3',
'-u',
'-m',
'google_cloud_pipeline_components.container.preview.dataflow.flex_template.launcher',
],
args=[
'--type',
'DataflowJob',
'--project',
project,
'--location',
location,
'--payload',
ConcatPlaceholder([
'{',
'"launch_parameter": {',
'"job_name": "',
job_name,
'"',
', "container_spec_gcs_path": "',
container_spec_gcs_path,
'"',
', "parameters": ',
parameters,
', "launch_options": ',
launch_options,
', "environment": {',
'"num_workers": ',
num_workers,
', "max_workers": ',
max_workers,
', "service_account_email": "',
service_account_email,
'"',
', "temp_location": "',
temp_location,
'"',
', "machine_type": "',
machine_type,
'"',
', "additional_experiments": ',
additional_experiments,
', "network": "',
network,
'"',
', "subnetwork": "',
subnetwork,
'"',
', "additional_user_labels": ',
additional_user_labels,
', "kms_key_name": "',
kms_key_name,
'"',
', "ip_configuration": "',
ip_configuration,
'"',
', "worker_region": "',
worker_region,
'"',
', "worker_zone": "',
worker_zone,
'"',
', "enable_streaming_engine": ',
enable_streaming_engine,
', "flexrs_goal": "',
flexrs_goal,
'"',
', "staging_location": "',
staging_location,
'"',
', "sdk_container_image": "',
sdk_container_image,
'"',
', "disk_size_gb": ',
disk_size_gb,
', "autoscaling_algorithm": "',
autoscaling_algorithm,
'"',
', "dump_heap_on_oom": ',
dump_heap_on_oom,
', "save_heap_dumps_to_gcs_path": "',
save_heap_dumps_to_gcs_path,
'"',
', "launcher_machine_type": "',
launcher_machine_type,
'"',
', "enable_launcher_vm_serial_port_logging": ',
enable_launcher_vm_serial_port_logging,
'}',
', "update": ',
update,
', "transform_name_mappings": ',
transform_name_mappings,
'}',
', "validate_only": ',
validate_only,
'}',
]),
'--gcp_resources',
gcp_resources,
],
)