feat(sdk): Add comments to IR YAML file (#8467)

* base

* add tests

* fix bug

* adress comments

* address comments 2

* sort comments

* sort signatures

* add indempotent test

* add indempotent test2

* support multiline docstring

* review

* docformatter presubmit exclude

* docformatter presubmit exclude

* docformatter

* docformatter

* merge 1

* update readme

* nit .items()

* remove reduntant test
This commit is contained in:
JOCSTAA 2022-12-05 14:43:14 -08:00 committed by GitHub
parent b166c18736
commit 49db63c916
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 553 additions and 51 deletions

View File

@ -43,3 +43,4 @@ repos:
name: docformatter
language: python
entry: docformatter -i -r
exclude: sdk/python/kfp/compiler/compiler_test.py

View File

@ -1,6 +1,7 @@
# Current Version (in development)
## Features
* Add comments to IR YAML file [\#8467](https://github.com/kubeflow/pipelines/pull/8467)
## Breaking changes

View File

@ -21,8 +21,6 @@ from typing import Any, Callable, Dict, Optional, Union
from kfp.compiler import pipeline_spec_builder as builder
from kfp.components import base_component
from kfp.components import graph_component
from kfp.components import yaml_component
from kfp.components.types import type_utils
@ -79,5 +77,8 @@ class Compiler:
pipeline_name=pipeline_name,
pipeline_parameters=pipeline_parameters,
)
builder.write_pipeline_spec_to_file(
pipeline_spec=pipeline_spec, package_path=package_path)
pipeline_spec=pipeline_spec,
pipeline_description=pipeline_func.description,
package_path=package_path)

View File

@ -18,6 +18,7 @@ import os
import re
import subprocess
import tempfile
import textwrap
from typing import Any, Dict, List, NamedTuple, Optional
import unittest
@ -29,6 +30,11 @@ from kfp import dsl
from kfp.cli import cli
from kfp.compiler import compiler
from kfp.components.types import type_utils
from kfp.dsl import ContainerSpec
from kfp.dsl import Input
from kfp.dsl import Model
from kfp.dsl import Output
from kfp.dsl import OutputPath
from kfp.dsl import PipelineTaskFinalStatus
from kfp.pipeline_spec import pipeline_spec_pb2
import yaml
@ -1893,5 +1899,426 @@ class TestCannotUseAfterCrossDAG(unittest.TestCase):
pipeline_func=my_pipeline, package_path=package_path)
class TestYamlComments(unittest.TestCase):
def test_comments_include_inputs_and_outputs_and_pipeline_name(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
inputs_string = textwrap.dedent("""\
# Inputs:
# sample_input1: bool [Default: True]
# sample_input2: str [Default: 'string']
""")
outputs_string = textwrap.dedent("""\
# Outputs:
# Output: str
""")
name_string = '# Name: my-pipeline'
self.assertIn(name_string, yaml_content)
self.assertIn(inputs_string, yaml_content)
self.assertIn(outputs_string, yaml_content)
def test_comments_include_definition(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def pipeline_with_no_definition(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_definition,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
description_string = '# Description:'
self.assertNotIn(description_string, yaml_content)
@dsl.pipeline()
def pipeline_with_definition(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""This is a definition of this pipeline."""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_definition,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
description_string = '# Description:'
self.assertIn(description_string, yaml_content)
def test_comments_on_pipeline_with_no_inputs_or_outputs(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def pipeline_with_no_inputs() -> str:
op1 = identity(string='string', model=True)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_inputs,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
inputs_string = '# Inputs:'
self.assertNotIn(inputs_string, yaml_content)
@dsl.pipeline()
def pipeline_with_no_outputs(sample_input1: bool = True,
sample_input2: str = 'string'):
identity(string=sample_input2, model=sample_input1)
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_no_outputs,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
outputs_string = '# Outputs:'
self.assertNotIn(outputs_string, yaml_content)
def test_comments_follow_pattern(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""This is a definition of this pipeline."""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
pattern_sample = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: my-pipeline
# Description: This is a definition of this pipeline.
# Inputs:
# sample_input1: bool [Default: True]
# sample_input2: str [Default: 'string']
# Outputs:
# Output: str
""")
self.assertIn(pattern_sample, yaml_content)
def test_verbose_comment_characteristics(self):
@dsl.component
def output_model(metrics: Output[Model]):
"""Dummy component that outputs metrics with a random accuracy."""
import random
result = random.randint(0, 100)
metrics.log_metric('accuracy', result)
@dsl.pipeline(name='Test pipeline')
def my_pipeline(sample_input1: bool,
sample_input2: str,
sample_input3: Input[Model],
sample_input4: float = 3.14,
sample_input5: list = [1],
sample_input6: dict = {'one': 1},
sample_input7: int = 5) -> Model:
"""This is a definition of this pipeline."""
task = output_model()
return task.output
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
predicted_comment = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: test-pipeline
# Description: This is a definition of this pipeline.
# Inputs:
# sample_input1: bool
# sample_input2: str
# sample_input3: system.Model
# sample_input4: float [Default: 3.14]
# sample_input5: list [Default: [1.0]]
# sample_input6: dict [Default: {'one': 1.0}]
# sample_input7: int [Default: 5.0]
# Outputs:
# Output: system.Model
""")
self.assertIn(predicted_comment, yaml_content)
def test_comments_on_compiled_components(self):
@dsl.component
def my_component(string: str, model: bool) -> str:
"""component description."""
return string
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_component, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
predicted_comment = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: my-component
# Description: component description.
# Inputs:
# model: bool
# string: str
""")
self.assertIn(predicted_comment, yaml_content)
@dsl.container_component
def my_container_component(text: str, output_path: OutputPath(str)):
"""component description."""
return ContainerSpec(
image='python:3.7',
command=['my_program', text],
args=['--output_path', output_path])
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_container_component,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
predicted_comment = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: my-container-component
# Description: component description.
# Inputs:
# text: str
""")
self.assertIn(predicted_comment, yaml_content)
def test_comments_idempotency(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""My description."""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
comp = components.load_component_from_file(pipeline_spec_path)
compiler.Compiler().compile(
pipeline_func=comp, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
reloaded_yaml_content = f.read()
predicted_comment = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: my-pipeline
# Description: My description.
# Inputs:
# sample_input1: bool [Default: True]
# sample_input2: str [Default: 'string']
# Outputs:
# Output: str
""")
# test initial comments
self.assertIn(predicted_comment, yaml_content)
# test reloaded comments
self.assertIn(predicted_comment, reloaded_yaml_content)
def test_comment_with_multiline_docstring(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def pipeline_with_multiline_definition(
sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""docstring short description.
docstring long description. docstring long description.
"""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_multiline_definition,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
description_string = textwrap.dedent("""\
# Description: docstring short description.
# docstring long description. docstring long description.
""")
self.assertIn(description_string, yaml_content)
@dsl.pipeline()
def pipeline_with_multiline_definition(
sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""
docstring long description.
docstring long description.
docstring long description.
"""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=pipeline_with_multiline_definition,
package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
description_string = textwrap.dedent("""\
# Description: docstring long description.
# docstring long description.
# docstring long description.
""")
self.assertIn(description_string, yaml_content)
def test_idempotency_on_comment_with_multiline_docstring(self):
@dsl.component
def identity(string: str, model: bool) -> str:
return string
@dsl.pipeline()
def my_pipeline(sample_input1: bool = True,
sample_input2: str = 'string') -> str:
"""docstring short description.
docstring long description.
docstring long description.
"""
op1 = identity(string=sample_input2, model=sample_input1)
result = op1.output
return result
with tempfile.TemporaryDirectory() as tmpdir:
pipeline_spec_path = os.path.join(tmpdir, 'output.yaml')
compiler.Compiler().compile(
pipeline_func=my_pipeline, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
yaml_content = f.read()
comp = components.load_component_from_file(pipeline_spec_path)
compiler.Compiler().compile(
pipeline_func=comp, package_path=pipeline_spec_path)
with open(pipeline_spec_path, 'r+') as f:
reloaded_yaml_content = f.read()
predicted_comment = textwrap.dedent("""\
# PIPELINE DEFINITION
# Name: my-pipeline
# Description: docstring short description.
# docstring long description.
# docstring long description.
# Inputs:
# sample_input1: bool [Default: True]
# sample_input2: str [Default: 'string']
# Outputs:
# Output: str
""")
# test initial comments
self.assertIn(predicted_comment, yaml_content)
# test reloaded comments
self.assertIn(predicted_comment, reloaded_yaml_content)
if __name__ == '__main__':
unittest.main()

View File

@ -1604,6 +1604,7 @@ def create_pipeline_spec(
def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
pipeline_description: str,
package_path: str) -> None:
"""Writes PipelineSpec into a YAML or JSON (deprecated) file.
@ -1612,6 +1613,8 @@ def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
package_path (str): The path to which to write the PipelineSpec.
"""
json_dict = json_format.MessageToDict(pipeline_spec)
yaml_comments = extract_comments_from_pipeline_spec(json_dict,
pipeline_description)
if package_path.endswith('.json'):
warnings.warn(
@ -1626,8 +1629,89 @@ def write_pipeline_spec_to_file(pipeline_spec: pipeline_spec_pb2.PipelineSpec,
elif package_path.endswith(('.yaml', '.yml')):
with open(package_path, 'w') as yaml_file:
yaml_file.write(yaml_comments)
yaml.dump(json_dict, yaml_file, sort_keys=True)
else:
raise ValueError(
f'The output path {package_path} should end with ".yaml".')
def extract_comments_from_pipeline_spec(pipeline_spec: dict,
pipeline_description: str) -> str:
map_parameter_types = {
'NUMBER_INTEGER': int.__name__,
'NUMBER_DOUBLE': float.__name__,
'STRING': str.__name__,
'BOOLEAN': bool.__name__,
'LIST': list.__name__,
'STRUCT': dict.__name__
}
map_headings = {
'inputDefinitions': '# Inputs:',
'outputDefinitions': '# Outputs:'
}
def collect_pipeline_signatures(root_dict: dict,
signature_type: str) -> List[str]:
comment_strings = []
if signature_type in root_dict:
signature = root_dict[signature_type]
comment_strings.append(map_headings[signature_type])
# Collect data
array_of_signatures = []
for parameter_name, parameter_body in signature.get(
'parameters', {}).items():
data = {}
data['name'] = parameter_name
data['parameterType'] = map_parameter_types[
parameter_body['parameterType']]
if 'defaultValue' in signature['parameters'][parameter_name]:
data['defaultValue'] = signature['parameters'][
parameter_name]['defaultValue']
if isinstance(data['defaultValue'], str):
data['defaultValue'] = "'" + data['defaultValue'] + "'"
array_of_signatures.append(data)
for artifact_name, artifact_body in signature.get('artifacts',
{}).items():
data = {
'name':
artifact_name,
'parameterType':
artifact_body['artifactType']['schemaTitle']
}
array_of_signatures.append(data)
array_of_signatures = sorted(
array_of_signatures, key=lambda d: d.get('name'))
# Present data
for signature in array_of_signatures:
string = '# ' + signature['name'] + ': ' + signature[
'parameterType']
if 'defaultValue' in signature:
string += ' [Default: ' + str(
signature['defaultValue']) + ']'
comment_strings.append(string)
return comment_strings
multi_line_description_prefix = '# '
comment_sections = []
comment_sections.append('# PIPELINE DEFINITION')
comment_sections.append('# Name: ' + pipeline_spec['pipelineInfo']['name'])
if pipeline_description:
pipeline_description = f'\n{multi_line_description_prefix}'.join(
pipeline_description.splitlines())
comment_sections.append('# Description: ' + pipeline_description)
comment_sections.extend(
collect_pipeline_signatures(pipeline_spec['root'], 'inputDefinitions'))
comment_sections.extend(
collect_pipeline_signatures(pipeline_spec['root'], 'outputDefinitions'))
comment = '\n'.join(comment_sections) + '\n'
return comment

View File

@ -13,8 +13,6 @@
# limitations under the License.
"""Tests for kfp.compiler.pipeline_spec_builder."""
import os
import tempfile
import unittest
from absl.testing import parameterized
@ -259,46 +257,5 @@ def pipeline_spec_from_file(filepath: str) -> str:
return json_format.ParseDict(dictionary, pipeline_spec_pb2.PipelineSpec())
class TestWriteIrToFile(unittest.TestCase):
@classmethod
def setUpClass(cls) -> None:
pipeline_spec = pipeline_spec_pb2.PipelineSpec()
pipeline_spec.pipeline_info.name = 'pipeline-name'
cls.pipeline_spec = pipeline_spec
def test_yaml(self):
with tempfile.TemporaryDirectory() as tempdir:
temp_filepath = os.path.join(tempdir, 'output.yaml')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)
def test_yml(self):
with tempfile.TemporaryDirectory() as tempdir:
temp_filepath = os.path.join(tempdir, 'output.yml')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)
def test_json(self):
with tempfile.TemporaryDirectory() as tempdir, self.assertWarnsRegex(
DeprecationWarning, r'Compiling to JSON is deprecated'):
temp_filepath = os.path.join(tempdir, 'output.json')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
actual = pipeline_spec_from_file(temp_filepath)
self.assertEqual(actual, self.pipeline_spec)
def test_incorrect_extension(self):
with tempfile.TemporaryDirectory() as tempdir, self.assertRaisesRegex(
ValueError, r'should end with "\.yaml"\.'):
temp_filepath = os.path.join(tempdir, 'output.txt')
pipeline_spec_builder.write_pipeline_spec_to_file(
self.pipeline_spec, temp_filepath)
if __name__ == '__main__':
unittest.main()

View File

@ -39,6 +39,7 @@ class BaseComponent(abc.ABC):
"""
self.component_spec = component_spec
self.name = component_spec.name
self.description = component_spec.description or None
# Arguments typed as PipelineTaskFinalStatus are special arguments that
# do not count as user inputs. Instead, they are reserved to for the

View File

@ -317,8 +317,13 @@ def extract_component_interface(
component_name = getattr(
func, '_component_human_name',
_python_function_name_to_component_name(func.__name__))
description = getattr(func, '_component_description',
parsed_docstring.short_description)
short_description = parsed_docstring.short_description
long_description = parsed_docstring.long_description
docstring_description = short_description + '\n' + long_description if long_description else short_description
description = getattr(func, '_component_description', docstring_description)
if description:
description = description.strip()

View File

@ -829,6 +829,26 @@ class ComponentSpec:
Component spec in the form of V2 ComponentSpec.
"""
def extract_description(component_yaml: str) -> Union[str, None]:
heading = '# Description: '
multi_line_description_prefix = '# '
index_of_heading = 2
if heading in component_yaml:
description = component_yaml.splitlines()[index_of_heading]
# Multi line
comments = component_yaml.splitlines()
index = index_of_heading + 1
while comments[index][:len(multi_line_description_prefix
)] == multi_line_description_prefix:
description += '\n' + comments[index][
len(multi_line_description_prefix) + 1:]
index += 1
return description[len(heading):]
else:
return None
json_component = yaml.safe_load(component_yaml)
is_v1 = 'implementation' in set(json_component.keys())
if is_v1:
@ -836,7 +856,12 @@ class ComponentSpec:
component_yaml)
return cls.from_v1_component_spec(v1_component)
else:
return ComponentSpec.from_pipeline_spec_dict(json_component)
component_spec = ComponentSpec.from_pipeline_spec_dict(
json_component)
if not component_spec.description:
component_spec.description = extract_description(
component_yaml=component_yaml)
return component_spec
def save_to_component_yaml(self, output_file: str) -> None:
"""Saves ComponentSpec into IR YAML file.
@ -847,7 +872,7 @@ class ComponentSpec:
from kfp.compiler import pipeline_spec_builder as builder
pipeline_spec = self.to_pipeline_spec()
builder.write_pipeline_spec_to_file(pipeline_spec, output_file)
builder.write_pipeline_spec_to_file(pipeline_spec, None, output_file)
def to_pipeline_spec(self) -> pipeline_spec_pb2.PipelineSpec:
"""Creates a pipeline instance and constructs the pipeline spec for a

View File

@ -18,4 +18,4 @@ source_root=$(pwd)
python3 -m pip install --upgrade pip
python3 -m pip install $(grep 'docformatter==' sdk/python/requirements-dev.txt)
docformatter --check --recursive "${source_root}/sdk/python/" --exclude "${source_root}/sdk/python/kfp/deprecated"
docformatter --check --recursive "${source_root}/sdk/python/" --exclude "compiler_test.py"