test(component): Update integration test for Model Monitor component (#9384)

* Add test that check component outputs

* Remove sagemaker check

* Extract get_output_ack_resource_metadata to a function
Extract "Scheduled" to constant FINAL_STATUS

* Extract to function: verify_monitoring_schedule_component_outputs
This commit is contained in:
rd-pong 2023-05-12 15:29:29 -07:00 committed by GitHub
parent 09d5151542
commit 1fa0893800
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 151 additions and 63 deletions

View File

@ -1,9 +1,9 @@
import time
import pytest
import os
import utils
from utils import kfp_client_utils
from utils import ack_utils
from utils import minio_utils
# Testing data quality job definition component and model explainability job definition component
@ -44,7 +44,7 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
] = deploy_endpoint
try:
_, _, _ = kfp_client_utils.compile_run_monitor_pipeline(
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
@ -54,16 +54,38 @@ def test_job_definitions(kfp_client, experiment_id, test_file_dir, deploy_endpoi
test_params["Timeout"],
)
# Verify if the job definition CR is created
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, test_params["Plural"]
)
# Check if the job definition is created
assert (
job_definition_name
in job_definition_describe["status"]["ackResourceMetadata"]["arn"]
)
# Verify component output
step_name = "sagemaker-" + test_params["Plural"][:-1]
outputs = {
step_name: [
"ack_resource_metadata",
"sagemaker_resource_name",
]
}
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)
output_ack_resource_metadata = (
kfp_client_utils.get_output_ack_resource_metadata(output_files, step_name)
)
output_resource_name = utils.read_from_file_in_tar(
output_files[step_name]["sagemaker_resource_name"]
)
assert job_definition_name in output_ack_resource_metadata["arn"]
assert output_resource_name == job_definition_name
finally:
ack_utils._delete_resource(
k8s_client, job_definition_name, test_params["Plural"]

View File

@ -1,10 +1,42 @@
import time
import pytest
import os
import utils
from utils import kfp_client_utils
from utils import ack_utils
from utils import sagemaker_utils
from utils import minio_utils
FINAL_STATUS = "Scheduled"
def verify_monitoring_schedule_component_outputs(
workflow_json, download_dir, monitoring_schedule_name
):
# Verify component outputs
outputs = {
"sagemaker-monitoringschedule": [
"ack_resource_metadata",
"monitoring_schedule_status",
"sagemaker_resource_name",
]
}
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)
output_ack_resource_metadata = kfp_client_utils.get_output_ack_resource_metadata(
output_files, "sagemaker-monitoringschedule"
)
output_schedule_status = utils.read_from_file_in_tar(
output_files["sagemaker-monitoringschedule"]["monitoring_schedule_status"]
)
output_schedule_name = utils.read_from_file_in_tar(
output_files["sagemaker-monitoringschedule"]["sagemaker_resource_name"]
)
assert monitoring_schedule_name in output_ack_resource_metadata["arn"]
assert output_schedule_name == monitoring_schedule_name
assert output_schedule_status == FINAL_STATUS
# Testing monitoring schedule with model bias job definition
@ -17,8 +49,8 @@ from utils import sagemaker_utils
),
],
)
def test_v2_monitoring_schedule(
kfp_client, experiment_id, test_file_dir, deploy_endpoint, sagemaker_client
def test_create_v2_monitoring_schedule(
kfp_client, experiment_id, test_file_dir, deploy_endpoint
):
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
@ -48,7 +80,7 @@ def test_v2_monitoring_schedule(
] = job_definition_name
try:
_, _, _ = kfp_client_utils.compile_run_monitor_pipeline(
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
@ -58,25 +90,37 @@ def test_v2_monitoring_schedule(
test_params["Timeout"],
)
# Verify if the job definition CR is created properly
job_definition_describe = ack_utils._get_resource(
k8s_client, job_definition_name, "modelbiasjobdefinitions"
)
assert (
job_definition_name
in job_definition_describe["status"]["ackResourceMetadata"]["arn"]
)
assert (
job_definition_describe["spec"]["modelBiasJobInput"]["endpointInput"][
"endpointName"
]
== deploy_endpoint
)
# Check if the job definition is created
assert job_definition_describe["status"]["ackResourceMetadata"]["arn"] != None
# Verify if monitoring schedule is created with correct name and endpoint
monitoring_schedule_describe = sagemaker_utils.describe_monitoring_schedule(
sagemaker_client, monitoring_schedule_name
# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
in monitoring_schedule_describe["MonitoringScheduleArn"]
in monitoring_schedule_describe["status"]["ackResourceMetadata"]["arn"]
)
assert (
monitoring_schedule_describe["status"]["monitoringScheduleStatus"]
== FINAL_STATUS
)
assert monitoring_schedule_describe["MonitoringScheduleStatus"] == "Scheduled"
assert monitoring_schedule_describe["EndpointName"] == deploy_endpoint
verify_monitoring_schedule_component_outputs(
workflow_json, download_dir, monitoring_schedule_name
)
finally:
ack_utils._delete_resource(
@ -110,8 +154,8 @@ def test_v2_monitoring_schedule(
),
],
)
def test_v2_monitoring_schedule_update(
kfp_client, experiment_id, test_file_dir, deploy_endpoint, sagemaker_client
def test_update_v2_monitoring_schedule(
kfp_client, experiment_id, test_file_dir, deploy_endpoint
):
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
@ -146,7 +190,7 @@ def test_v2_monitoring_schedule_update(
] = "ml.m5.large"
test_params["Arguments"]["monitoring_schedule_name"] = monitoring_schedule_name
_, _, _ = kfp_client_utils.compile_run_monitor_pipeline(
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
@ -156,54 +200,57 @@ def test_v2_monitoring_schedule_update(
test_params["Timeout"],
)
job_definition_1_describe_ack = ack_utils._get_resource(
k8s_client, job_definition_name_1, test_params["Plural"]
)
# Check if the job definition is created
assert (
job_definition_1_describe_ack["status"]["ackResourceMetadata"]["arn"]
!= None
)
# Verify if monitoring schedule is created with correct name and endpoint
monitoring_schedule_describe = sagemaker_utils.describe_monitoring_schedule(
sagemaker_client, monitoring_schedule_name
# Verify if monitoring schedule CR is created properly
monitoring_schedule_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_name
in monitoring_schedule_describe["MonitoringScheduleArn"]
in monitoring_schedule_describe["status"]["ackResourceMetadata"]["arn"]
)
assert monitoring_schedule_describe["MonitoringScheduleStatus"] == "Scheduled"
assert monitoring_schedule_describe["EndpointName"] == deploy_endpoint
assert (
monitoring_schedule_describe["MonitoringScheduleConfig"][
"MonitoringJobDefinitionName"
monitoring_schedule_describe["status"]["monitoringScheduleStatus"]
== FINAL_STATUS
)
assert (
monitoring_schedule_describe["spec"]["monitoringScheduleConfig"][
"monitoringJobDefinitionName"
]
== job_definition_name_1
)
# Verify if job definition is created with correct instance type
job_definition_1_describe = (
sagemaker_utils.describe_data_quality_job_definition(
sagemaker_client, job_definition_name_1
)
# Verify if the job definition CR is created properly
job_definition_1_describe = ack_utils._get_resource(
k8s_client, job_definition_name_1, "dataqualityjobdefinitions"
)
assert (
job_definition_1_describe["JobResources"]["ClusterConfig"]["InstanceType"]
job_definition_name_1
in job_definition_1_describe["status"]["ackResourceMetadata"]["arn"]
)
assert (
job_definition_1_describe["spec"]["dataQualityJobInput"]["endpointInput"][
"endpointName"
]
== deploy_endpoint
)
assert (
job_definition_1_describe["spec"]["jobResources"]["clusterConfig"][
"instanceType"
]
== "ml.m5.large"
)
verify_monitoring_schedule_component_outputs(
workflow_json, download_dir, monitoring_schedule_name
)
# Update monitoring schedule using new job definition
test_params["Arguments"]["job_definition_name"] = job_definition_name_2
test_params["Arguments"]["job_resources"]["clusterConfig"][
"instanceType"
] = "ml.m5.xlarge"
_, _, _ = kfp_client_utils.compile_run_monitor_pipeline(
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
@ -213,30 +260,40 @@ def test_v2_monitoring_schedule_update(
test_params["Timeout"],
)
monitoring_schedule_updated_describe = (
sagemaker_utils.describe_monitoring_schedule(
sagemaker_client, monitoring_schedule_name
)
# Verify if monitoring schedule is updated with correct job definition
monitoring_schedule_updated_describe = ack_utils._get_resource(
k8s_client, monitoring_schedule_name, "monitoringschedules"
)
assert (
monitoring_schedule_updated_describe["MonitoringScheduleConfig"][
"MonitoringJobDefinitionName"
monitoring_schedule_updated_describe["status"]["monitoringScheduleStatus"]
== FINAL_STATUS
)
assert (
monitoring_schedule_updated_describe["spec"]["monitoringScheduleConfig"][
"monitoringJobDefinitionName"
]
== job_definition_name_2
)
# Verify if job definition is created with correct instance type
job_definition_2_describe = (
sagemaker_utils.describe_data_quality_job_definition(
sagemaker_client, job_definition_name_2
)
# Verify if the new job definition CR is created properly
job_definition_2_describe = ack_utils._get_resource(
k8s_client, job_definition_name_2, "dataqualityjobdefinitions"
)
assert (
job_definition_2_describe["JobResources"]["ClusterConfig"]["InstanceType"]
job_definition_name_2
in job_definition_2_describe["status"]["ackResourceMetadata"]["arn"]
)
assert (
job_definition_2_describe["spec"]["jobResources"]["clusterConfig"][
"instanceType"
]
== "ml.m5.xlarge"
)
verify_monitoring_schedule_component_outputs(
workflow_json, download_dir, monitoring_schedule_name
)
finally:
ack_utils._delete_resource(
k8s_client,

View File

@ -1,3 +1,4 @@
import json
import os
import utils
import pytest
@ -75,3 +76,11 @@ def compile_run_monitor_pipeline(
def terminate_run(client, run_id):
client.runs.terminate_run(run_id)
wait_for_job_status(client, run_id, 30, "failed")
def get_output_ack_resource_metadata(output_files, step_name):
return json.loads(
utils.read_from_file_in_tar(
output_files[step_name]["ack_resource_metadata"]
).replace("'", '"')
)