feat(sdk.v2): Support `set_display_name` in v2. (#6471)

* Support `set_display_name` in v2.

* update comment in proto

* fix test
This commit is contained in:
Chen Sun 2021-08-30 12:07:25 -07:00 committed by GitHub
parent 63913bec25
commit cfefc6d397
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 18 additions and 30 deletions

View File

@ -562,8 +562,7 @@ message ArtifactTypeSchema {
// The basic info of a task.
message PipelineTaskInfo {
// The unique name of the task within the pipeline definition. This name
// will be used in downstream tasks to indicate task and data dependencies.
// The display name of the task.
string name = 1;
}

View File

@ -597,9 +597,6 @@ def _attach_v2_specs(
if not component_spec.name:
component_spec.name = _components._default_component_name
# task.name is unique at this point.
pipeline_task_spec.task_info.name = (dsl_utils.sanitize_task_name(task.name))
resolved_cmd = _resolve_commands_and_args_v2(
component_spec=component_spec, arguments=arguments)

View File

@ -38,7 +38,6 @@ from kfp.pipeline_spec import pipeline_spec_pb2
from kfp.v2.components.types import artifact_types, type_utils
from kfp.v2.components import component_factory
_GroupOrOp = Union[dsl.OpsGroup, dsl.BaseOp]
@ -559,10 +558,9 @@ class Compiler(object):
artifact_types.Metrics.TYPE_NAME,
artifact_types.ClassificationMetrics.TYPE_NAME,
]:
unique_output_name = '{}-{}'.format(op_task_spec.task_info.name,
output_name)
unique_output_name = '{}-{}'.format(op.name, output_name)
sub_task_name = op_task_spec.task_info.name
sub_task_name = op.name
sub_task_output = output_name
for component_name, task_name in parent_components_and_tasks:
group_component_spec = (
@ -624,6 +622,9 @@ class Compiler(object):
subgroup_component_spec = getattr(subgroup, 'component_spec',
pipeline_spec_pb2.ComponentSpec())
display_name = getattr(subgroup, 'display_name', None)
subgroup_task_spec.task_info.name = display_name or subgroup.name
is_recursive_subgroup = (
isinstance(subgroup, dsl.OpsGroup) and subgroup.recursive_ref)
if is_recursive_subgroup:
@ -632,9 +633,6 @@ class Compiler(object):
else:
subgroup_key = subgroup.name
subgroup_task_spec.task_info.name = (
subgroup_task_spec.task_info.name or
dsl_utils.sanitize_task_name(subgroup_key))
# human_name exists for ops only, and is used to de-dupe component spec.
subgroup_component_name = (
subgroup_task_spec.component_ref.name or
@ -648,10 +646,9 @@ class Compiler(object):
if isinstance(subgroup, dsl.ContainerOp):
if hasattr(subgroup, 'importer_spec'):
importer_task_name = subgroup.task_spec.task_info.name
importer_comp_name = subgroup.task_spec.component_ref.name
importer_exec_label = subgroup.component_spec.executor_label
group_component_spec.dag.tasks[importer_task_name].CopyFrom(
group_component_spec.dag.tasks[subgroup.name].CopyFrom(
subgroup.task_spec)
pipeline_spec.components[importer_comp_name].CopyFrom(
subgroup.component_spec)
@ -820,8 +817,7 @@ class Compiler(object):
subgroup_component_spec)
# Add task spec
group_component_spec.dag.tasks[
subgroup_task_spec.task_info.name].CopyFrom(subgroup_task_spec)
group_component_spec.dag.tasks[subgroup.name].CopyFrom(subgroup_task_spec)
# Add AIPlatformCustomJobSpec, if applicable.
custom_job_spec = getattr(subgroup, 'custom_job_spec', None)
@ -929,7 +925,10 @@ class Compiler(object):
exit_handler_op = first_group.exit_op
# Add exit op task spec
task_name = exit_handler_op.task_spec.task_info.name
task_name = exit_handler_op.name
display_name = exit_handler_op.display_name
exit_handler_op.task_spec.task_info.name = display_name or task_name
exit_handler_op.task_spec.dependent_tasks.extend(
pipeline_spec.root.dag.tasks.keys())
exit_handler_op.task_spec.trigger_policy.strategy = (

View File

@ -89,7 +89,7 @@
}
},
"taskInfo": {
"name": "read-from-gcs"
"name": "Consumer"
}
},
"write-to-gcs": {
@ -107,7 +107,7 @@
}
},
"taskInfo": {
"name": "write-to-gcs"
"name": "Producer"
}
}
}
@ -121,7 +121,7 @@
}
},
"schemaVersion": "2.0.0",
"sdkVersion": "kfp-1.7.0"
"sdkVersion": "kfp-1.7.2"
},
"runtimeConfig": {
"gcsOutputDirectory": "dummy_root",

View File

@ -56,9 +56,10 @@ implementation:
@dsl.pipeline(name='simple-two-step-pipeline', pipeline_root='dummy_root')
def my_pipeline(text: str = 'Hello world!'):
component_1 = component_op_1(text=text)
component_1 = component_op_1(text=text).set_display_name('Producer')
component_2 = component_op_2(
input_gcs_path=component_1.outputs['output_gcs_path'])
component_2.set_display_name('Consumer')
if __name__ == '__main__':

View File

@ -64,7 +64,6 @@ def _build_importer_task_spec(
An importer node task spec.
"""
result = pipeline_spec_pb2.PipelineTaskSpec()
result.task_info.name = dsl_utils.sanitize_task_name(importer_base_name)
result.component_ref.name = dsl_utils.sanitize_component_name(
importer_base_name)

View File

@ -12,11 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from absl.testing import parameterized
import unittest
from absl.testing import parameterized
from google.protobuf import json_format
from kfp.dsl import _pipeline_param
from kfp.pipeline_spec import pipeline_spec_pb2 as pb
from kfp.v2.components import importer_node
@ -73,9 +72,6 @@ class ImporterNodeTest(parameterized.TestCase):
'importer_name': 'importer-1',
'input_uri': 'gs://artifact',
'expected_result': {
'taskInfo': {
'name': 'importer-1'
},
'inputs': {
'parameters': {
'uri': {
@ -97,9 +93,6 @@ class ImporterNodeTest(parameterized.TestCase):
'importer_name': 'importer-2',
'input_uri': _pipeline_param.PipelineParam(name='uri_to_import'),
'expected_result': {
'taskInfo': {
'name': 'importer-2'
},
'inputs': {
'parameters': {
'uri': {