test(components): make GCPC pipelinespec compilation tests robust to schema evolution

PiperOrigin-RevId: 531608264
This commit is contained in:
Connor McCarthy 2023-05-12 14:41:36 -07:00 committed by Google Cloud Pipeline Components maintainers
parent cb93fcc5ca
commit 1996d7516d
9 changed files with 300 additions and 294 deletions

View File

@ -13,11 +13,12 @@
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure they compile correctly."""
import json
import os
from google_cloud_pipeline_components.experimental.notebooks import NotebooksExecutorOp
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
from kfp import compiler
import unittest
@ -33,14 +34,9 @@ class ComponentsCompileTest(unittest.TestCase):
self._location = 'us-central1'
self._master_type = 'n1-standard-4'
self._container_image_uri = 'gcr.io/deeplearning-platform-release/base-cpu'
self._package_path = os.path.join(
os.getenv('TEST_UNDECLARED_OUTPUTS_DIR'), 'pipeline.json'
)
def tearDown(self):
super(ComponentsCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def test_notebooks_executor_op_compile(self):
@kfp.dsl.pipeline(name='notebooks-executor-test')
@ -57,17 +53,11 @@ class ComponentsCompileTest(unittest.TestCase):
f'PROJECT_ID={self._project},EXECUTION_ID={self._execution_id}'
),
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/notebooks_executor_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open('testdata/notebooks_executor_component_pipeline.json') as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)

View File

@ -0,0 +1,13 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

View File

@ -13,11 +13,12 @@
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure they compile correctly."""
import json
import os
from google_cloud_pipeline_components.v1.bigquery import BigqueryQueryJobOp
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
from kfp import compiler
import unittest
@ -32,14 +33,9 @@ class ComponentsCompileTest(unittest.TestCase):
self._job_configuration_query = {'priority': 'high'}
self._labels = {'key1': 'val1'}
self._encryption_spec_key_name = 'fake_encryption_key'
self._package_path = os.path.join(
os.getenv('TEST_UNDECLARED_OUTPUTS_DIR'), 'pipeline.json'
)
def tearDown(self):
super(ComponentsCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def test_bigquery_query_job_op_compile(self):
@kfp.dsl.pipeline(name='bigquery-test')
@ -53,17 +49,11 @@ class ComponentsCompileTest(unittest.TestCase):
labels=self._labels,
encryption_spec_key_name=self._encryption_spec_key_name,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/bigquery_query_job_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open('testdata/bigquery_query_job_component_pipeline.json') as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)

View File

@ -13,12 +13,11 @@
# limitations under the License.
"""Test Custom training job to ensure the compile without error."""
import json
import os
from google_cloud_pipeline_components.v1.custom_job import CustomTrainingJobOp
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
from kfp import compiler
import unittest
@ -30,9 +29,6 @@ class CustomTrainingJobCompileTest(unittest.TestCase):
self._project = "test_project"
self._location = "us-central1"
self._test_input_string = "test_input_string"
self._package_path = os.path.join(
os.getenv("TEST_UNDECLARED_OUTPUTS_DIR"), "pipeline.json"
)
self._worker_pool_specs = [{
"machine_spec": {
"machine_type": "n1-standard-4",
@ -48,8 +44,6 @@ class CustomTrainingJobCompileTest(unittest.TestCase):
def tearDown(self):
super(CustomTrainingJobCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def test_custom_training_job_op_compile(self):
@kfp.dsl.pipeline(name="training-test")
@ -63,23 +57,11 @@ class CustomTrainingJobCompileTest(unittest.TestCase):
labels=self._labels,
service_account=self._service_account,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"../testdata/custom_training_job_pipeline.json",
)
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparision
del executor_output_json["sdkVersion"]
del executor_output_json["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
),
)

View File

@ -13,13 +13,12 @@
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure the compile without error."""
import json
import os
from google_cloud_pipeline_components.v1.custom_job import utils
from google_cloud_pipeline_components.tests.v1 import testing_utils
import kfp
from kfp import components
from kfp import compiler
import unittest
@ -31,7 +30,6 @@ class CustomTrainingJobWrapperCompileTest(unittest.TestCase):
self._project = "test_project"
self._location = "us-central1"
self._test_input_string = "test_input_string"
self._package_path = "pipeline.json"
self._container_component = components.load_component_from_text(
"name: Producer\ninputs:\n- {name: input_text, type: String,"
" description: 'Represents an input parameter.'}\noutputs:\n- {name:"
@ -46,8 +44,6 @@ class CustomTrainingJobWrapperCompileTest(unittest.TestCase):
def tearDown(self):
super(CustomTrainingJobWrapperCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def _create_a_pytnon_based_component(self):
"""Creates a test python based component factory."""
@ -70,23 +66,11 @@ class CustomTrainingJobWrapperCompileTest(unittest.TestCase):
project=self._project,
location=self._location,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
testing_utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"../testdata/custom_training_job_wrapper_pipeline.json",
)
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparision
del executor_output_json["pipelineSpec"]["sdkVersion"]
del executor_output_json["pipelineSpec"]["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
),
)

View File

@ -13,11 +13,12 @@
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure they compile correctly."""
import json
import os
from google_cloud_pipeline_components.v1.dataflow import DataflowPythonJobOp
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
from kfp import compiler
import unittest
@ -33,14 +34,9 @@ class ComponentsCompileTest(unittest.TestCase):
self._gcs_source = 'gs://test_gcs_source'
self._temp_location = 'gs://temp_location'
self._pipeline_root = 'gs://test_pipeline_root'
self._package_path = os.path.join(
os.getenv('TEST_UNDECLARED_OUTPUTS_DIR'), 'pipeline.json'
)
def tearDown(self):
super(ComponentsCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def test_dataflow_python_op_compile(self):
@kfp.dsl.pipeline(name='dataflow-python-test')
@ -53,17 +49,11 @@ class ComponentsCompileTest(unittest.TestCase):
requirements_file_path=self._requirements_file_path,
args=self._args,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/dataflow_python_job_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open('testdata/dataflow_python_job_component_pipeline.json') as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)

View File

@ -13,12 +13,11 @@
# limitations under the License.
"""Test google-cloud-pipeline-components to ensure they compile correctly."""
import json
import os
import kfp
from google_cloud_pipeline_components.v1 import dataproc
from kfp import compiler
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
import unittest
@ -56,14 +55,9 @@ class ComponentsCompileTest(unittest.TestCase):
self._archive_uris = ['test-archive-file-uri-1', 'test-archive-file-uri-2']
self._query_variables = {'foo': 'bar', 'fizz': 'buzz'}
self._batch_specific_args = ['test-arg-1', 'test-arg-2']
self._package_path = os.path.join(
os.getenv('TEST_UNDECLARED_OUTPUTS_DIR'), 'pipeline.json'
)
def tearDown(self):
super(ComponentsCompileTest, self).tearDown()
if os.path.exists(self._package_path):
os.remove(self._package_path)
def test_dataproc_create_pyspark_batch_op_compile(self):
"""Compile a test pipeline using the Dataproc PySparkBatch component."""
@ -92,22 +86,14 @@ class ComponentsCompileTest(unittest.TestCase):
archive_uris=self._archive_uris,
args=self._batch_specific_args,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/dataproc_create_pyspark_batch_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
'testdata/dataproc_create_pyspark_batch_component_pipeline.json'
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)
def test_dataproc_create_spark_batch_op_compile(self):
"""Compile a test pipeline using the Dataproc SparkBatch component."""
@ -136,22 +122,14 @@ class ComponentsCompileTest(unittest.TestCase):
archive_uris=self._archive_uris,
args=self._batch_specific_args,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/dataproc_create_spark_batch_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
'testdata/dataproc_create_spark_batch_component_pipeline.json'
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)
def test_dataproc_create_spark_r_batch_op_compile(self):
"""Compile a test pipeline using the Dataproc SparkRBatch component."""
@ -179,21 +157,14 @@ class ComponentsCompileTest(unittest.TestCase):
args=self._batch_specific_args,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/dataproc_create_spark_r_batch_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
'testdata/dataproc_create_spark_r_batch_component_pipeline.json'
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)
def test_dataproc_create_spark_sql_batch_op_compile(self):
"""Compile a test pipeline using the Dataproc SparkSqlBatch component."""
@ -219,19 +190,11 @@ class ComponentsCompileTest(unittest.TestCase):
jar_file_uris=self._jar_file_uris,
query_variables=self._query_variables,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
'../testdata/dataproc_create_spark_sql_batch_component_pipeline.json',
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open(
'testdata/dataproc_create_spark_sql_batch_component_pipeline.json'
) as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json['sdkVersion']
del executor_output_json['schemaVersion']
self.assertDictEqual(executor_output_json, expected_executor_output_json)

View File

@ -13,46 +13,39 @@
# limitations under the License.
"""Test google-cloud-pipeline-Components to ensure the compile without error."""
import json
import os
import kfp
from google_cloud_pipeline_components.v1.automl.training_job import (
AutoMLForecastingTrainingJobRunOp,
AutoMLImageTrainingJobRunOp,
AutoMLTabularTrainingJobRunOp,
AutoMLTextTrainingJobRunOp,
AutoMLVideoTrainingJobRunOp,
)
from google_cloud_pipeline_components.v1.dataset import (
ImageDatasetCreateOp,
ImageDatasetExportDataOp,
ImageDatasetImportDataOp,
TabularDatasetCreateOp,
TabularDatasetExportDataOp,
TextDatasetCreateOp,
TextDatasetExportDataOp,
TextDatasetImportDataOp,
TimeSeriesDatasetCreateOp,
TimeSeriesDatasetExportDataOp,
VideoDatasetCreateOp,
VideoDatasetExportDataOp,
VideoDatasetImportDataOp,
)
from google_cloud_pipeline_components.v1.endpoint import (
EndpointCreateOp,
EndpointDeleteOp,
ModelDeployOp,
ModelUndeployOp,
)
from google_cloud_pipeline_components.v1.model import (
ModelDeleteOp,
ModelUploadOp,
ModelExportOp,
)
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLForecastingTrainingJobRunOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLImageTrainingJobRunOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTabularTrainingJobRunOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLTextTrainingJobRunOp
from google_cloud_pipeline_components.v1.automl.training_job import AutoMLVideoTrainingJobRunOp
from google_cloud_pipeline_components.v1.batch_predict_job import ModelBatchPredictOp
from google_cloud_pipeline_components.v1.dataset import ImageDatasetCreateOp
from google_cloud_pipeline_components.v1.dataset import ImageDatasetExportDataOp
from google_cloud_pipeline_components.v1.dataset import ImageDatasetImportDataOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetCreateOp
from google_cloud_pipeline_components.v1.dataset import TabularDatasetExportDataOp
from google_cloud_pipeline_components.v1.dataset import TextDatasetCreateOp
from google_cloud_pipeline_components.v1.dataset import TextDatasetExportDataOp
from google_cloud_pipeline_components.v1.dataset import TextDatasetImportDataOp
from google_cloud_pipeline_components.v1.dataset import TimeSeriesDatasetCreateOp
from google_cloud_pipeline_components.v1.dataset import TimeSeriesDatasetExportDataOp
from google_cloud_pipeline_components.v1.dataset import VideoDatasetCreateOp
from google_cloud_pipeline_components.v1.dataset import VideoDatasetExportDataOp
from google_cloud_pipeline_components.v1.dataset import VideoDatasetImportDataOp
from google_cloud_pipeline_components.v1.endpoint import EndpointCreateOp
from google_cloud_pipeline_components.v1.endpoint import EndpointDeleteOp
from google_cloud_pipeline_components.v1.endpoint import ModelDeployOp
from google_cloud_pipeline_components.v1.endpoint import ModelUndeployOp
from google_cloud_pipeline_components.v1.model import ModelDeleteOp
from google_cloud_pipeline_components.v1.model import ModelExportOp
from google_cloud_pipeline_components.v1.model import ModelUploadOp
from google_cloud_pipeline_components.tests.v1 import utils
import kfp
from kfp import compiler
from kfp.dsl import Input, Artifact
from kfp.dsl import Artifact
from kfp.dsl import Input
import unittest
from google.cloud import aiplatform
@ -114,17 +107,13 @@ class ComponentsCompileTest(unittest.TestCase):
dataset=dataset_create_op.outputs["dataset"],
import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__), "testdata/automl_image_pipeline.json"
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/automl_image_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK version during comparison
del executor_output_json["sdkVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_automl_tabular_component_compile(self):
@kfp.dsl.pipeline(name="training-test")
@ -194,17 +183,13 @@ class ComponentsCompileTest(unittest.TestCase):
dataset=dataset_create_op.outputs["dataset"],
import_schema_uri=aiplatform.schema.dataset.ioformat.text.multi_label_classification,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__), "testdata/automl_text_pipeline.json"
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/automl_text_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK version during comparison
del executor_output_json["sdkVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_automl_video_component_compile(self):
@kfp.dsl.pipeline(name="training-test")
@ -240,16 +225,13 @@ class ComponentsCompileTest(unittest.TestCase):
import_schema_uri=aiplatform.schema.dataset.ioformat.video.classification,
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__), "testdata/automl_video_pipeline.json"
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/automl_video_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK version during comparison
del executor_output_json["sdkVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_automl_forecasting_component_compile(self):
@kfp.dsl.pipeline(name="training-test")
@ -311,20 +293,15 @@ class ComponentsCompileTest(unittest.TestCase):
_ = ModelDeleteOp(
model=model_upload_op.outputs["model"],
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"testdata/model_upload_and_delete_pipeline.json",
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/model_upload_and_delete_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json["sdkVersion"]
del executor_output_json["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_create_endpoint_op_and_delete_endpoint_op_compile(self):
@kfp.dsl.pipeline(name="delete-endpoint-test")
def pipeline():
@ -342,19 +319,15 @@ class ComponentsCompileTest(unittest.TestCase):
endpoint=create_endpoint_op.outputs["endpoint"]
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"testdata/create_and_delete_endpoint_pipeline.json",
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/create_and_delete_endpoint_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json["sdkVersion"]
del executor_output_json["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_model_export_op_compile(self):
@kfp.dsl.pipeline(name="training-test")
def pipeline(unmanaged_container_model: Input[Artifact]):
@ -370,20 +343,15 @@ class ComponentsCompileTest(unittest.TestCase):
artifact_destination="artifact_destination",
image_destination="image_destination",
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"testdata/model_export_pipeline.json",
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/model_export_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json["sdkVersion"]
del executor_output_json["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_model_deploy_op_and_model_undeploy_op_compile(self):
@kfp.dsl.pipeline(name="training-test")
def pipeline(unmanaged_container_model: Input[Artifact]):
@ -421,19 +389,15 @@ class ComponentsCompileTest(unittest.TestCase):
endpoint=create_endpoint_op.outputs["endpoint"],
).after(model_deploy_op)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"testdata/model_deploy_and_undeploy_pipeline.json",
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/model_deploy_and_undeploy_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK & schema version during comparison
del executor_output_json["sdkVersion"]
del executor_output_json["schemaVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)
def test_batch_prediction_op_compile(self):
@kfp.dsl.pipeline(name="training-test")
def pipeline(unmanaged_container_model: Input[Artifact]):
@ -468,14 +432,11 @@ class ComponentsCompileTest(unittest.TestCase):
labels={"foo": "bar"},
)
compiler.Compiler().compile(
pipeline_func=pipeline, package_path=self._package_path
utils.assert_pipeline_equals_golden(
self,
pipeline,
os.path.join(
os.path.dirname(__file__),
"testdata/batch_prediction_pipeline.json",
),
)
with open(self._package_path) as f:
executor_output_json = json.load(f, strict=False)
with open("testdata/batch_prediction_pipeline.json") as ef:
expected_executor_output_json = json.load(ef, strict=False)
# Ignore the kfp SDK version during comparison
del executor_output_json["sdkVersion"]
self.assertEqual(executor_output_json, expected_executor_output_json)

View File

@ -0,0 +1,133 @@
# Copyright 2023 The Kubeflow Authors. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Utilities for testing."""
import copy
import json
from typing import Any, Optional, Set
from google.protobuf import json_format
import unittest
def assert_pipeline_equals_golden(
test_case: unittest.TestCase,
compilable,
comparison_file: str,
) -> None:
"""Compare a compilable pipeline/component against a golden snapshot comparison file containing a known valid PipelineSpec.
Permits the compiled output to have a schema that has migrated past the
comparison file. Skips comparison of fields that do not need to be compared to
assert equality.
Args:
test_case: An instance of a TestCase.
compilable: Pipeline/component.
comparison_file: Path to a JSON golden snapshot of a the compiled
PipelineSpec.
"""
with open(comparison_file) as f:
expected_pipeline_spec_dict = json.load(f)
actual_pipeline_spec_dict = json_format.MessageToDict(
compilable.pipeline_spec
)
expected_pipeline_spec_dict['sdkVersion'] = 'bye'
actual_pipeline_spec_dict['sdkVersion'] = 'hi'
ignore_fields = {'sdkVersion'}
compare_pipeline_spec_dicts(
test_case,
actual_pipeline_spec_dict,
expected_pipeline_spec_dict,
comparison_file,
ignore_fields,
)
def compare_pipeline_spec_dicts(
test_case: unittest.TestCase,
actual: dict,
expected: dict,
comparison_file: Optional[str] = None,
ignore_fields: Optional[Set[str]] = None,
) -> None:
"""Compares two PipelineSpec dictionaries.
Permits actual to have a proto schema evolution that
is ahead of expected. If KFP SDK adds a field to PipelineSpec, but the golden
snapshot hasn't been updated, the test case will not fail. Prints out the
actual JSON for easy copy and paste into the golden snapshot file. Note that
actual and expected are treated differently and are not
interchangeable.
Args:
test_case: An instance of a TestCase.
actual: Actual PipelineSpec as a dict.
expected: Expected PipelineSpec as a dict.
comparison_file: Path to a JSON golden snapshot of a the compiled
PipelineSpec.
ignore_fields: If a field's key is in ignore_fields it will not be used to
assert equality.
"""
original_actual = copy.deepcopy(actual)
ignore_fields = ignore_fields or None
def make_copypaste_message(actual_pipeline_spec_json: dict) -> str:
return (
'\n\nTo update the JSON to the new version, copy and paste the'
f' following into the golden snapshot file {comparison_file or ""}. Be'
' sure the change is what you expect.\n'
+ json.dumps(
actual_pipeline_spec_json,
)
)
def compare_json_dicts(
test_case: unittest.TestCase,
actual: Any,
expected: Any,
) -> None:
if type(actual) is not type(expected):
test_case.assertEqual(
type(actual),
type(expected),
f'Types do not match: {type(actual)} != {type(expected)}'
+ make_copypaste_message(original_actual),
)
if isinstance(actual, dict):
for key in expected:
if key in ignore_fields:
continue
test_case.assertIn(
key,
actual,
f'Key "{key}" not found in first json object'
+ make_copypaste_message(original_actual),
)
compare_json_dicts(test_case, actual[key], expected[key])
elif isinstance(actual, list):
test_case.assertEqual(
len(actual), len(expected), 'Lists are of different lengths'
)
for i in range(len(actual)):
compare_json_dicts(test_case, actual[i], expected[i])
else:
test_case.assertEqual(
actual,
expected,
f'Values do not match: {actual} != {expected}'
+ make_copypaste_message(original_actual),
)
compare_json_dicts(test_case, actual, expected)