From 1fa0893800f8f7615a036971d5e6c7d1b02e5829 Mon Sep 17 00:00:00 2001 From: rd-pong Date: Fri, 12 May 2023 15:29:29 -0700 Subject: [PATCH] test(component): Update integration test for Model Monitor component (#9384) * Add test that check component outputs * Remove sagemaker check * Extract get_output_ack_resource_metadata to a function Extract "Scheduled" to constant FINAL_STATUS * Extract to function: verify_monitoring_schedule_component_outputs --- ..._v2_monitoring_job_definition_component.py | 30 ++- .../test_v2_monitoring_schedule_component.py | 175 ++++++++++++------ .../utils/kfp_client_utils.py | 9 + 3 files changed, 151 insertions(+), 63 deletions(-) diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_job_definition_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_job_definition_component.py index ce05e269ca..2e8eed1f42 100644 --- a/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_job_definition_component.py +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_job_definition_component.py @@ -1,9 +1,9 @@ -import time import pytest import os import utils from utils import kfp_client_utils from utils import ack_utils +from utils import minio_utils # Testing data quality job definition component and model explainability job definition component @@ -44,7 +44,7 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi ] = deploy_endpoint try: - _, _, _ = kfp_client_utils.compile_run_monitor_pipeline( + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( kfp_client, experiment_id, test_params["PipelineDefinition"], @@ -54,16 +54,38 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi test_params["Timeout"], ) + # Verify if the job definition CR is created job_definition_describe = ack_utils._get_resource( k8s_client, job_definition_name, test_params["Plural"] ) - - # Check if the job definition is created assert ( job_definition_name in job_definition_describe["status"]["ackResourceMetadata"]["arn"] ) + # Verify component output + step_name = "sagemaker-" + test_params["Plural"][:-1] + outputs = { + step_name: [ + "ack_resource_metadata", + "sagemaker_resource_name", + ] + } + + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + + output_ack_resource_metadata = ( + kfp_client_utils.get_output_ack_resource_metadata(output_files, step_name) + ) + output_resource_name = utils.read_from_file_in_tar( + output_files[step_name]["sagemaker_resource_name"] + ) + + assert job_definition_name in output_ack_resource_metadata["arn"] + assert output_resource_name == job_definition_name + finally: ack_utils._delete_resource( k8s_client, job_definition_name, test_params["Plural"] diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_schedule_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_schedule_component.py index abf472353d..7edd7bd853 100644 --- a/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_schedule_component.py +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_v2_monitoring_schedule_component.py @@ -1,10 +1,42 @@ -import time import pytest import os import utils from utils import kfp_client_utils from utils import ack_utils -from utils import sagemaker_utils +from utils import minio_utils + +FINAL_STATUS = "Scheduled" + + +def verify_monitoring_schedule_component_outputs( + workflow_json, download_dir, monitoring_schedule_name +): + # Verify component outputs + outputs = { + "sagemaker-monitoringschedule": [ + "ack_resource_metadata", + "monitoring_schedule_status", + "sagemaker_resource_name", + ] + } + + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + + output_ack_resource_metadata = kfp_client_utils.get_output_ack_resource_metadata( + output_files, "sagemaker-monitoringschedule" + ) + output_schedule_status = utils.read_from_file_in_tar( + output_files["sagemaker-monitoringschedule"]["monitoring_schedule_status"] + ) + output_schedule_name = utils.read_from_file_in_tar( + output_files["sagemaker-monitoringschedule"]["sagemaker_resource_name"] + ) + + assert monitoring_schedule_name in output_ack_resource_metadata["arn"] + assert output_schedule_name == monitoring_schedule_name + assert output_schedule_status == FINAL_STATUS # Testing monitoring schedule with model bias job definition @@ -17,8 +49,8 @@ from utils import sagemaker_utils ), ], ) -def test_v2_monitoring_schedule( - kfp_client, experiment_id, test_file_dir, deploy_endpoint, sagemaker_client +def test_create_v2_monitoring_schedule( + kfp_client, experiment_id, test_file_dir, deploy_endpoint ): download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) test_params = utils.load_params( @@ -48,7 +80,7 @@ def test_v2_monitoring_schedule( ] = job_definition_name try: - _, _, _ = kfp_client_utils.compile_run_monitor_pipeline( + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( kfp_client, experiment_id, test_params["PipelineDefinition"], @@ -58,25 +90,37 @@ def test_v2_monitoring_schedule( test_params["Timeout"], ) + # Verify if the job definition CR is created properly job_definition_describe = ack_utils._get_resource( k8s_client, job_definition_name, "modelbiasjobdefinitions" ) + assert ( + job_definition_name + in job_definition_describe["status"]["ackResourceMetadata"]["arn"] + ) + assert ( + job_definition_describe["spec"]["modelBiasJobInput"]["endpointInput"][ + "endpointName" + ] + == deploy_endpoint + ) - # Check if the job definition is created - assert job_definition_describe["status"]["ackResourceMetadata"]["arn"] != None - - # Verify if monitoring schedule is created with correct name and endpoint - monitoring_schedule_describe = sagemaker_utils.describe_monitoring_schedule( - sagemaker_client, monitoring_schedule_name + # Verify if monitoring schedule CR is created properly + monitoring_schedule_describe = ack_utils._get_resource( + k8s_client, monitoring_schedule_name, "monitoringschedules" ) assert ( monitoring_schedule_name - in monitoring_schedule_describe["MonitoringScheduleArn"] + in monitoring_schedule_describe["status"]["ackResourceMetadata"]["arn"] + ) + assert ( + monitoring_schedule_describe["status"]["monitoringScheduleStatus"] + == FINAL_STATUS ) - assert monitoring_schedule_describe["MonitoringScheduleStatus"] == "Scheduled" - - assert monitoring_schedule_describe["EndpointName"] == deploy_endpoint + verify_monitoring_schedule_component_outputs( + workflow_json, download_dir, monitoring_schedule_name + ) finally: ack_utils._delete_resource( @@ -110,8 +154,8 @@ def test_v2_monitoring_schedule( ), ], ) -def test_v2_monitoring_schedule_update( - kfp_client, experiment_id, test_file_dir, deploy_endpoint, sagemaker_client +def test_update_v2_monitoring_schedule( + kfp_client, experiment_id, test_file_dir, deploy_endpoint ): download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) test_params = utils.load_params( @@ -146,7 +190,7 @@ def test_v2_monitoring_schedule_update( ] = "ml.m5.large" test_params["Arguments"]["monitoring_schedule_name"] = monitoring_schedule_name - _, _, _ = kfp_client_utils.compile_run_monitor_pipeline( + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( kfp_client, experiment_id, test_params["PipelineDefinition"], @@ -156,54 +200,57 @@ def test_v2_monitoring_schedule_update( test_params["Timeout"], ) - job_definition_1_describe_ack = ack_utils._get_resource( - k8s_client, job_definition_name_1, test_params["Plural"] - ) - - # Check if the job definition is created - assert ( - job_definition_1_describe_ack["status"]["ackResourceMetadata"]["arn"] - != None - ) - - # Verify if monitoring schedule is created with correct name and endpoint - monitoring_schedule_describe = sagemaker_utils.describe_monitoring_schedule( - sagemaker_client, monitoring_schedule_name + # Verify if monitoring schedule CR is created properly + monitoring_schedule_describe = ack_utils._get_resource( + k8s_client, monitoring_schedule_name, "monitoringschedules" ) assert ( monitoring_schedule_name - in monitoring_schedule_describe["MonitoringScheduleArn"] + in monitoring_schedule_describe["status"]["ackResourceMetadata"]["arn"] ) - - assert monitoring_schedule_describe["MonitoringScheduleStatus"] == "Scheduled" - - assert monitoring_schedule_describe["EndpointName"] == deploy_endpoint - assert ( - monitoring_schedule_describe["MonitoringScheduleConfig"][ - "MonitoringJobDefinitionName" + monitoring_schedule_describe["status"]["monitoringScheduleStatus"] + == FINAL_STATUS + ) + assert ( + monitoring_schedule_describe["spec"]["monitoringScheduleConfig"][ + "monitoringJobDefinitionName" ] == job_definition_name_1 ) - # Verify if job definition is created with correct instance type - job_definition_1_describe = ( - sagemaker_utils.describe_data_quality_job_definition( - sagemaker_client, job_definition_name_1 - ) + # Verify if the job definition CR is created properly + job_definition_1_describe = ack_utils._get_resource( + k8s_client, job_definition_name_1, "dataqualityjobdefinitions" ) assert ( - job_definition_1_describe["JobResources"]["ClusterConfig"]["InstanceType"] + job_definition_name_1 + in job_definition_1_describe["status"]["ackResourceMetadata"]["arn"] + ) + assert ( + job_definition_1_describe["spec"]["dataQualityJobInput"]["endpointInput"][ + "endpointName" + ] + == deploy_endpoint + ) + assert ( + job_definition_1_describe["spec"]["jobResources"]["clusterConfig"][ + "instanceType" + ] == "ml.m5.large" ) + verify_monitoring_schedule_component_outputs( + workflow_json, download_dir, monitoring_schedule_name + ) + # Update monitoring schedule using new job definition test_params["Arguments"]["job_definition_name"] = job_definition_name_2 test_params["Arguments"]["job_resources"]["clusterConfig"][ "instanceType" ] = "ml.m5.xlarge" - _, _, _ = kfp_client_utils.compile_run_monitor_pipeline( + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( kfp_client, experiment_id, test_params["PipelineDefinition"], @@ -213,30 +260,40 @@ def test_v2_monitoring_schedule_update( test_params["Timeout"], ) - monitoring_schedule_updated_describe = ( - sagemaker_utils.describe_monitoring_schedule( - sagemaker_client, monitoring_schedule_name - ) + # Verify if monitoring schedule is updated with correct job definition + monitoring_schedule_updated_describe = ack_utils._get_resource( + k8s_client, monitoring_schedule_name, "monitoringschedules" ) - assert ( - monitoring_schedule_updated_describe["MonitoringScheduleConfig"][ - "MonitoringJobDefinitionName" + monitoring_schedule_updated_describe["status"]["monitoringScheduleStatus"] + == FINAL_STATUS + ) + assert ( + monitoring_schedule_updated_describe["spec"]["monitoringScheduleConfig"][ + "monitoringJobDefinitionName" ] == job_definition_name_2 ) - # Verify if job definition is created with correct instance type - job_definition_2_describe = ( - sagemaker_utils.describe_data_quality_job_definition( - sagemaker_client, job_definition_name_2 - ) + # Verify if the new job definition CR is created properly + job_definition_2_describe = ack_utils._get_resource( + k8s_client, job_definition_name_2, "dataqualityjobdefinitions" ) assert ( - job_definition_2_describe["JobResources"]["ClusterConfig"]["InstanceType"] + job_definition_name_2 + in job_definition_2_describe["status"]["ackResourceMetadata"]["arn"] + ) + assert ( + job_definition_2_describe["spec"]["jobResources"]["clusterConfig"][ + "instanceType" + ] == "ml.m5.xlarge" ) + verify_monitoring_schedule_component_outputs( + workflow_json, download_dir, monitoring_schedule_name + ) + finally: ack_utils._delete_resource( k8s_client, diff --git a/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py index afbc54937e..a473fb8817 100644 --- a/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py +++ b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py @@ -1,3 +1,4 @@ +import json import os import utils import pytest @@ -75,3 +76,11 @@ def compile_run_monitor_pipeline( def terminate_run(client, run_id): client.runs.terminate_run(run_id) wait_for_job_status(client, run_id, 30, "failed") + + +def get_output_ack_resource_metadata(output_files, step_name): + return json.loads( + utils.read_from_file_in_tar( + output_files[step_name]["ack_resource_metadata"] + ).replace("'", '"') + )