feat(components): Add utility function for constructing the serverless custom job launcher spec

PiperOrigin-RevId: 538221200
This commit is contained in:
Yang Pan 2023-06-06 10:17:57 -07:00 committed by Google Cloud Pipeline Components maintainers
parent 27d8f7a651
commit 6fb10f3738
2 changed files with 74 additions and 44 deletions

View File

@ -16,18 +16,63 @@
import copy
import json
import re
from typing import Any, List, Optional
from typing import Any, Dict, List, Optional
from google_cloud_pipeline_components import _image
from kfp import components
# note: this is a slight dependency on KFP SDK implementation details
# other code should not similarly depend on the stability of kfp.placeholders
from kfp import dsl
from kfp.components import placeholders
from google.protobuf import json_format
# note: this is a slight dependency on KFP SDK implementation details
# other code should not similarly depend on the stability of kfp.placeholders
DOCS_INTEGRATED_OUTPUT_RENAMING_PREFIX = "output__"
def build_serverless_customjob_container_spec(
*,
project: str,
location: str,
custom_job_payload: Dict[str, Any],
gcp_resources: dsl.OutputPath(str), # pytype: disable=invalid-annotation
) -> dsl.ContainerSpec:
"""Builds a container spec that launches a custom job.
Args:
project: Project to run the job in.
location: Location to run the job in.
custom_job_payload: Payload to pass to the custom job. This dictionary is
serialized and passed as the custom job ``--payload``.
gcp_resources: GCP resources that can be used to track the job.
Returns:
Container spec that launches a custom job with the specified payload.
"""
return dsl.ContainerSpec(
image=_image.GCPC_IMAGE_TAG,
command=[
"python3",
"-u",
"-m",
"google_cloud_pipeline_components.container.v1.custom_job.launcher",
],
args=[
"--type",
"CustomJob",
"--payload",
container_component_dumps(custom_job_payload),
"--project",
project,
"--location",
location,
"--gcp_resources",
gcp_resources,
],
)
def container_component_dumps(obj: Any) -> Any:
"""Dump object to JSON string with KFP SDK placeholders included and, if the placeholder does not correspond to a runtime string, quotes escaped.

View File

@ -98,47 +98,32 @@ def custom_training_job(
gcp_resources: Serialized gcp_resources proto tracking the batch prediction job. For more details, see https://github.com/kubeflow/pipelines/blob/master/components/google-cloud/google_cloud_pipeline_components/proto/README.md.
"""
# fmt: on
return dsl.ContainerSpec(
image=_image.GCPC_IMAGE_TAG,
command=[
'python3',
'-u',
'-m',
'google_cloud_pipeline_components.container.v1.custom_job.launcher',
],
args=[
'--type',
'CustomJob',
'--payload',
utils.container_component_dumps({
'display_name': display_name,
'job_spec': {
'worker_pool_specs': worker_pool_specs,
'scheduling': {
'timeout': timeout,
'restart_job_on_worker_restart': (
restart_job_on_worker_restart
),
},
'service_account': service_account,
'tensorboard': tensorboard,
'enable_web_access': enable_web_access,
'network': network,
'reserved_ip_ranges': reserved_ip_ranges,
'base_output_directory': {
'output_uri_prefix': base_output_directory
},
return utils.build_serverless_customjob_container_spec(
project=project,
location=location,
custom_job_payload={
'display_name': display_name,
'job_spec': {
'worker_pool_specs': worker_pool_specs,
'scheduling': {
'timeout': timeout,
'restart_job_on_worker_restart': (
restart_job_on_worker_restart
),
},
'labels': labels,
'encryption_spec_key_name': {
'kms_key_name': encryption_spec_key_name
'service_account': service_account,
'tensorboard': tensorboard,
'enable_web_access': enable_web_access,
'network': network,
'reserved_ip_ranges': reserved_ip_ranges,
'base_output_directory': {
'output_uri_prefix': base_output_directory
},
}),
'--project',
project,
'--location',
location,
'--gcp_resources',
gcp_resources,
],
},
'labels': labels,
'encryption_spec_key_name': {
'kms_key_name': encryption_spec_key_name
},
},
gcp_resources=gcp_resources,
)