feat(components/google-cloud): Add support for labels in custom_job wrapper (#6579)

* Add support for labels in custom_job wrapper

* Add comment on V2 support

* fix lint error
This commit is contained in:
sina chavoshi 2021-09-17 17:18:23 -07:00 committed by GitHub
parent cd4f19128c
commit 400ed0c139
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 45 additions and 3 deletions

View File

@ -13,15 +13,16 @@
# limitations under the License.
"""Module for supporting Google Vertex AI Custom Job."""
# Prior to release of kfp V2, we have to use a mix of kfp v1 and v2.
# TODO(chavoshi): switch to using V2 only once it is ready.
import copy
import json
import tempfile
from typing import Callable, List, Optional, Mapping, Any
from kfp import components, dsl
from typing import Callable, List, Optional, Mapping, Any, Dict
from kfp import components
from kfp.dsl import dsl_utils
from kfp.v2.components.types import type_utils
from google_cloud_pipeline_components.aiplatform import utils
from kfp.components import structures
_DEFAULT_CUSTOM_JOB_MACHINE_TYPE = 'n1-standard-4'
@ -45,6 +46,7 @@ def run_as_vertex_ai_custom_job(
encryption_spec_key_name: Optional[str] = None,
tensorboard: Optional[str] = None,
base_output_directory: Optional[str] = None,
labels: Optional[Dict[str, str]] = None,
) -> Callable:
"""Run a pipeline task using AI Platform (Unified) custom training job.
@ -90,6 +92,8 @@ def run_as_vertex_ai_custom_job(
the baseOutputDirectory of each child CustomJob backing a Trial is set
to a subdirectory of name [id][Trial.id] under its parent
HyperparameterTuningJob's baseOutputDirectory.
labels: Optional. The labels with user-defined metadata to organize
CustomJobs. See https://goo.gl/xmQnxf for more information.
Returns:
A Custom Job component OP correspoinding to the input component OP.
"""
@ -183,6 +187,11 @@ def run_as_vertex_ai_custom_job(
1)
job_spec['worker_pool_specs'].append(additional_worker_pool_spec)
#TODO(chavoshi): Use input parameter instead of hard coded string label.
# This requires Dictionary input type to be supported in V2.
if labels is not None:
job_spec['labels'] = labels
if timeout is not None:
if 'scheduling' not in job_spec:
job_spec['scheduling'] = {}

View File

@ -545,3 +545,36 @@ implementation:
self.assertDictContainsSubset(
subset=expected_sub_results,
dictionary=custom_job_spec.component_spec.to_dict())
def test_run_as_vertex_ai_custom_with_labels_converts_correctly(self):
component_factory_function = self._create_a_container_based_component()
expected_sub_results = {
'implementation': {
'container': {
'image':
'test_launcher_image',
'command': [
'python3', '-u', '-m',
'google_cloud_pipeline_components.experimental.remote.gcp_launcher.launcher'
],
'args': [
'--type', 'CustomJob', '--payload',
'{"display_name": "ContainerComponent", "job_spec": {"worker_pool_specs": [{"machine_spec": {"machine_type": "n1-standard-4"}, "replica_count": 1, "container_spec": {"image_uri": "google/cloud-sdk:latest", "command": ["sh", "-c", "set -e -x\\necho \\"$0, this is an output parameter\\"\\n", "{{$.inputs.parameters[\'input_text\']}}", "{{$.outputs.parameters[\'output_value\'].output_file}}"]}}], "labels": {"test_key": "test_value"}, "service_account": "{{$.inputs.parameters[\'service_account}\']}}", "network": "{{$.inputs.parameters[\'network}\']}}", "encryption_spec_key_name": "{{$.inputs.parameters[\'encryption_spec_key_name}\']}}", "tensorboard": "{{$.inputs.parameters[\'tensorboard}\']}}", "base_output_directory": "{{$.inputs.parameters[\'base_output_directory}\']}}"}}',
'--project', {
'inputValue': 'project'
}, '--location', {
'inputValue': 'location'
}, '--gcp_resources', {
'outputPath': 'gcp_resources'
}
]
}
}
}
custom_job_spec = custom_job.run_as_vertex_ai_custom_job(
component_factory_function, labels={"test_key": "test_value"})
self.assertDictContainsSubset(
subset=expected_sub_results,
dictionary=custom_job_spec.component_spec.to_dict())