chore(sdk): improve IR read/write testing (#7920)

* add more load component test cases

* add read write equality tests

* add copyright

* remove duplicative structures tests

* add if __name__ == '__main__' block
This commit is contained in:
Connor McCarthy 2022-06-24 09:45:15 -06:00 committed by GitHub
parent 58f7ab8b49
commit 281151c051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 294 additions and 77 deletions

View File

@ -0,0 +1,85 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
CONFIG = {
'pipelines': {
'test_cases': [
'pipeline_with_importer',
'pipeline_with_ontology',
'pipeline_with_if_placeholder',
'pipeline_with_concat_placeholder',
'pipeline_with_resource_spec',
'pipeline_with_various_io_types',
'pipeline_with_reused_component',
'pipeline_with_after',
'pipeline_with_condition',
'pipeline_with_nested_conditions',
'pipeline_with_nested_conditions_yaml',
'pipeline_with_loops',
'pipeline_with_nested_loops',
'pipeline_with_loops_and_conditions',
'pipeline_with_params_containing_format',
'lightweight_python_functions_pipeline',
'lightweight_python_functions_with_outputs',
'xgboost_sample_pipeline',
'pipeline_with_metrics_outputs',
'pipeline_with_exit_handler',
'pipeline_with_env',
'component_with_optional_inputs',
'pipeline_with_gcpc_types',
'pipeline_with_placeholders',
'pipeline_with_task_final_status',
'pipeline_with_task_final_status_yaml',
'component_with_pip_index_urls',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/pipelines',
'config': {
'read': False,
'write': True
}
},
'components': {
'test_cases': [
'add_numbers',
'component_with_pip_install',
'concat_message',
'dict_input',
'identity',
'nested_return',
'output_artifact',
'output_metrics',
'preprocess',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/components',
'config': {
'read': True,
'write': True
}
},
'v1_components': {
'test_cases': [
# TODO: this component currently has a bug when extracting the component description from the command
# 'concat_placeholder_component',
# TODO: these three currently have placeholder bugs -- uncomment after fix
# 'if_placeholder_component',
# 'add_component',
# 'ingestion_component',
],
'test_data_dir': 'sdk/python/kfp/compiler/test_data/v1_component_yaml',
'config': {
'read': True,
'write': False
}
}
}

View File

@ -12,7 +12,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from ast import Assert
import json
import os
import re
@ -870,51 +869,9 @@ class TestWriteIrToFile(unittest.TestCase):
PIPELINES_TEST_DATA_DIR = os.path.join(
os.path.dirname(__file__), 'test_data', 'pipelines')
SUPPORTED_COMPONENTS_TEST_DATA_DIR = os.path.join(
os.path.dirname(__file__), 'test_data', 'components')
UNSUPPORTED_COMPONENTS_TEST_DATA_DIR = os.path.join(
os.path.dirname(__file__), 'test_data', 'components', 'unsupported')
PIPELINE_TEST_CASES = [
'pipeline_with_importer',
'pipeline_with_ontology',
'pipeline_with_if_placeholder',
'pipeline_with_concat_placeholder',
'pipeline_with_resource_spec',
'pipeline_with_various_io_types',
'pipeline_with_reused_component',
'pipeline_with_after',
'pipeline_with_condition',
'pipeline_with_nested_conditions',
'pipeline_with_nested_conditions_yaml',
'pipeline_with_loops',
'pipeline_with_nested_loops',
'pipeline_with_loops_and_conditions',
'pipeline_with_params_containing_format',
'lightweight_python_functions_pipeline',
'lightweight_python_functions_with_outputs',
'xgboost_sample_pipeline',
'pipeline_with_metrics_outputs',
'pipeline_with_exit_handler',
'pipeline_with_env',
'component_with_optional_inputs',
'pipeline_with_gcpc_types',
'pipeline_with_placeholders',
'pipeline_with_task_final_status',
'pipeline_with_task_final_status_yaml',
'component_with_pip_index_urls',
]
SUPPORTED_COMPONENT_TEST_CASES = [
'add_numbers',
'component_with_pip_install',
'concat_message',
'dict_input',
'identity',
'nested_return',
'output_artifact',
'output_metrics',
'preprocess',
]
UNSUPPORTED_COMPONENT_TEST_CASES = [
'artifact_consumer',
'output_named_tuple',
@ -923,7 +880,7 @@ UNSUPPORTED_COMPONENT_TEST_CASES = [
]
class TestCompile(parameterized.TestCase):
class TestReadWriteEquality(parameterized.TestCase):
@classmethod
def setUpClass(cls):
@ -997,17 +954,6 @@ class TestCompile(parameterized.TestCase):
self.assertIn('Deprecated. Please use `kfp dsl compile` instead.)',
res.stdout.decode('utf-8'))
@parameterized.parameters(PIPELINE_TEST_CASES)
def test_compile_pipelines(self, file: str):
self._test_compile(file, directory=PIPELINES_TEST_DATA_DIR)
@parameterized.parameters(SUPPORTED_COMPONENT_TEST_CASES)
def test_compile_components(self, component_name: str):
self._test_compile(
component_name,
directory=SUPPORTED_COMPONENTS_TEST_DATA_DIR,
fn=component_name)
@parameterized.parameters(UNSUPPORTED_COMPONENT_TEST_CASES)
def test_compile_unsupported_components(self, component_name: str):
with self.assertRaisesRegex(TypeError, r'is not a valid input'):

View File

@ -0,0 +1,194 @@
# Copyright 2022 The Kubeflow Authors
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import re
import sys
import tempfile
import types
from typing import Any, Callable, Dict, List, Optional, Union
import unittest
from absl.testing import parameterized
from kfp import compiler
from kfp import components
from kfp.compiler._read_write_test_config import CONFIG
from kfp.components import pipeline_context
from kfp.components import python_component
import yaml
PROJECT_ROOT = os.path.abspath(os.path.join(__file__, *([os.path.pardir] * 5)))
def expand_config(config: dict) -> List[Dict[str, Any]]:
parameters: List[Dict[str, Any]] = []
for name, test_group in config.items():
test_data_dir = os.path.join(PROJECT_ROOT, test_group['test_data_dir'])
config = test_group['config']
parameters.extend({
'name': name + '-' + test_case,
'test_case': test_case,
'test_data_dir': test_data_dir,
'read': config['read'],
'write': config['write'],
'function': None if name == 'pipelines' else test_case,
} for test_case in test_group['test_cases'])
return parameters
def collect_pipeline_from_module(
target_module: types.ModuleType
) -> Union[Callable[..., Any], python_component.PythonComponent]:
pipelines = []
module_attrs = dir(target_module)
for attr in module_attrs:
obj = getattr(target_module, attr)
if pipeline_context.Pipeline.is_pipeline_func(obj):
pipelines.append(obj)
if len(pipelines) == 1:
return pipelines[0]
else:
raise ValueError(
f'Expect one pipeline function in module {target_module}, got {len(pipelines)}: {pipelines}. Please specify the pipeline function name with --function.'
)
def collect_pipeline_func(
python_file: str,
function_name: Optional[str] = None
) -> Union[Callable[..., Any], python_component.PythonComponent]:
sys.path.insert(0, os.path.dirname(python_file))
try:
filename = os.path.basename(python_file)
module_name = os.path.splitext(filename)[0]
if function_name is None:
return collect_pipeline_from_module(
target_module=__import__(module_name))
module = __import__(module_name, fromlist=[function_name])
if not hasattr(module, function_name):
raise ValueError(
f'Pipeline function or component "{function_name}" not found in module {filename}.'
)
return getattr(module, function_name)
finally:
del sys.path[0]
def ignore_kfp_version_helper(spec: Dict[str, Any]) -> Dict[str, Any]:
"""Ignores kfp sdk versioning in command.
Takes in a YAML input and ignores the kfp sdk versioning in command
for comparison between compiled file and goldens.
"""
pipeline_spec = spec.get('pipelineSpec', spec)
if 'executors' in pipeline_spec['deploymentSpec']:
for executor in pipeline_spec['deploymentSpec']['executors']:
pipeline_spec['deploymentSpec']['executors'][
executor] = yaml.safe_load(
re.sub(
r"'kfp==(\d+).(\d+).(\d+)(-[a-z]+.\d+)?'", 'kfp',
yaml.dump(
pipeline_spec['deploymentSpec']['executors']
[executor],
sort_keys=True)))
return spec
def load_compiled_file(filename: str) -> Dict[str, Any]:
with open(filename) as f:
contents = yaml.safe_load(f)
pipeline_spec = contents[
'pipelineSpec'] if 'pipelineSpec' in contents else contents
# ignore the sdkVersion
del pipeline_spec['sdkVersion']
return ignore_kfp_version_helper(contents)
class ReadWriteTest(parameterized.TestCase):
def _compile_and_load_component(
self, compilable: Union[Callable[..., Any],
python_component.PythonComponent]):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, 're_compiled_output.yaml')
compiler.Compiler().compile(compilable, tmp_file)
return components.load_component_from_file(tmp_file)
def _compile_and_read_yaml(
self, compilable: Union[Callable[..., Any],
python_component.PythonComponent]):
with tempfile.TemporaryDirectory() as tmp_dir:
tmp_file = os.path.join(tmp_dir, 're_compiled_output.yaml')
compiler.Compiler().compile(compilable, tmp_file)
return load_compiled_file(tmp_file)
def _test_serialization_deserialization_consistency(self, yaml_file: str):
"""Tests serialization and deserialization consistency."""
original_component = components.load_component_from_file(yaml_file)
reloaded_component = self._compile_and_load_component(
original_component)
self.assertEqual(original_component.component_spec,
reloaded_component.component_spec)
def _test_serialization_correctness(self,
python_file: str,
yaml_file: str,
function_name: Optional[str] = None):
"""Tests serialization correctness."""
pipeline = collect_pipeline_func(
python_file, function_name=function_name)
compiled_result = self._compile_and_read_yaml(pipeline)
golden_result = load_compiled_file(yaml_file)
self.assertEqual(compiled_result, golden_result)
@parameterized.parameters(expand_config((CONFIG)))
def test(
self,
name: str,
test_case: str,
test_data_dir: str,
function: Optional[str],
read: bool,
write: bool,
):
"""Tests serialization and deserialization consistency and correctness.
Args:
name (str): '{test_grou_name}-{test_case_name}'. Useful for print statements/debugging.
test_case (str): Test case name (without file extension).
test_data_dir (str): The directory containing the test case files.
function (str, optional): The function name to compile.
read (bool): Whether the pipeline/component supports deserialization from YAML (IR, except for V1 component YAML back compatability tests).
write (bool): Whether the pipeline/component supports compilation from a Python file.
"""
yaml_file = os.path.join(test_data_dir, f'{test_case}.yaml')
py_file = os.path.join(test_data_dir, f'{test_case}.py')
if write:
self._test_serialization_correctness(
python_file=py_file,
yaml_file=yaml_file,
function_name=function)
if read:
self._test_serialization_deserialization_consistency(
yaml_file=yaml_file)
if __name__ == '__main__':
unittest.main()

View File

@ -14,15 +14,12 @@
"""Tests for kfp.components.structures."""
import os
import sys
import tempfile
import textwrap
import unittest
from absl.testing import parameterized
from kfp import compiler
from kfp.compiler.compiler_test import SUPPORTED_COMPONENT_TEST_CASES
from kfp.compiler.compiler_test import SUPPORTED_COMPONENTS_TEST_DATA_DIR
from kfp.components import placeholders
from kfp.components import structures
@ -617,24 +614,6 @@ class TestReadInComponent(parameterized.TestCase):
self.assertEqual(component_spec.implementation.container.image,
'alpine')
@parameterized.parameters(SUPPORTED_COMPONENT_TEST_CASES)
def test_read_in_all_v2_components(self, file):
try:
sys.path.insert(0, SUPPORTED_COMPONENTS_TEST_DATA_DIR)
mod = __import__(file, fromlist=[file])
component = getattr(mod, file)
finally:
del sys.path[0]
with tempfile.TemporaryDirectory() as tmpdir:
component_file = os.path.join(tmpdir, 'component.yaml')
compiler.Compiler().compile(component, component_file)
with open(component_file, 'r') as f:
yaml_str = f.read()
loaded_component_spec = structures.ComponentSpec.load_from_component_yaml(
yaml_str)
self.assertEqual(component.component_spec, loaded_component_spec)
def test_simple_placeholder(self):
compiled_yaml = textwrap.dedent("""
components:

View File

@ -19,9 +19,9 @@ import textwrap
import unittest
from unittest import mock
import requests
from kfp.components import structures
from kfp.components import yaml_component
import requests
SAMPLE_YAML = textwrap.dedent("""\
components:
@ -72,6 +72,19 @@ schemaVersion: 2.1.0
sdkVersion: kfp-2.0.0-alpha.3
""")
V1_COMPONENTS_TEST_DATA_DIR = os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'compiler', 'test_data',
'v1_component_yaml')
V1_COMPONENT_YAML_TEST_CASES = [
'concat_placeholder_component.yaml',
'ingestion_component.yaml',
'serving_component.yaml',
'if_placeholder_component.yaml',
'trainer_component.yaml',
'add_component.yaml',
]
class YamlComponentTest(unittest.TestCase):