[AWS SageMaker] Integration Test for AWS SageMaker GroundTruth Component (#3830)

* Integration Test for AWS SageMaker GroundTruth Component

* Unfix already fixed bug

* Fix the README I overwrote by mistake

* Remove use of aws-secret for OIDC

* Rev 2: Fix linting errors
This commit is contained in:
Meghna Baijal 2020-05-26 15:44:40 -07:00 committed by GitHub
parent 9f1c596566
commit fb549531f1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
12 changed files with 367 additions and 8 deletions

View File

@ -49,11 +49,11 @@ inputs:
type: String
- name: max_human_labeled_objects
description: 'The maximum number of objects that can be labeled by human workers.'
default: ''
default: '0'
type: Integer
- name: max_percent_objects
description: 'The maximum number of input data objects that should be labeled.'
default: ''
default: '0'
type: Integer
- name: enable_auto_labeling
description: 'Enables auto-labeling, only for bounding box, text classification, and image classification.'

View File

@ -2,10 +2,14 @@
1. [Docker](https://www.docker.com/)
1. [IAM Role](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) with a SageMakerFullAccess and AmazonS3FullAccess
1. IAM User credentials with SageMakerFullAccess, AWSCloudFormationFullAccess, IAMFullAccess, AmazonEC2FullAccess, AmazonS3FullAccess permissions
2. The SageMaker WorkTeam and GroundTruth Component tests expect that at least one private workteam already exists in the region where you are running these tests.
## Creating S3 buckets with datasets
In the following Python script, change the bucket name and run the [`s3_sample_data_creator.py`](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset) to create an S3 bucket with the sample mnist dataset in the region where you want to run the tests.
1. In the following Python script, change the bucket name and run the [`s3_sample_data_creator.py`](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset) to create an S3 bucket with the sample mnist dataset in the region where you want to run the tests.
2. To prepare the dataset for the SageMaker GroundTruth Component test, follow the steps in the `[GroundTruth Sample README](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/ground_truth_pipeline_demo#prep-the-dataset-label-categories-and-ui-template)`.
## Step to run integration tests
1. Copy the `.env.example` file to `.env` and in the following steps modify the fields of this new file:

View File

@ -0,0 +1,87 @@
import pytest
import os
import json
import utils
from utils import kfp_client_utils
from utils import sagemaker_utils
from test_workteam_component import create_workteamjob
import time
@pytest.mark.parametrize(
"test_file_dir",
[
pytest.param(
"resources/config/image-classification-groundtruth",
marks=pytest.mark.canary_test,
)
],
)
def test_groundtruth_labeling_job(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir
):
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)
# First create a workteam using a separate pipeline and get the name, arn of the workteam created.
workteam_name, _ = create_workteamjob(
kfp_client,
experiment_id,
region,
sagemaker_client,
"resources/config/create-workteam",
download_dir,
)
test_params["Arguments"][
"workteam_arn"
] = workteam_arn = sagemaker_utils.get_workteam_arn(sagemaker_client, workteam_name)
# Generate the ground_truth_train_job_name based on the workteam which will be used for labeling.
test_params["Arguments"][
"ground_truth_train_job_name"
] = ground_truth_train_job_name = (
test_params["Arguments"]["ground_truth_train_job_name"] + "-by-" + workteam_name
)
_ = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
test_params["StatusToCheck"],
)
# Verify the GroundTruthJob was created in SageMaker and is InProgress.
# TODO: Add a bot to complete the labeling job and check for completion instead.
try:
response = sagemaker_utils.describe_labeling_job(
sagemaker_client, ground_truth_train_job_name
)
assert response["LabelingJobStatus"] == "InProgress"
# Verify that the workteam has the specified labeling job
labeling_jobs = sagemaker_utils.list_labeling_jobs_for_workteam(
sagemaker_client, workteam_arn
)
assert len(labeling_jobs["LabelingJobSummaryList"]) == 1
assert (
labeling_jobs["LabelingJobSummaryList"][0]["LabelingJobName"]
== ground_truth_train_job_name
)
finally:
# Cleanup the SageMaker Resources
sagemaker_utils.stop_labeling_job(sagemaker_client, ground_truth_train_job_name)
sagemaker_utils.delete_workteam(sagemaker_client, workteam_name)
# Delete generated files
utils.remove_dir(download_dir)

View File

@ -0,0 +1,83 @@
import pytest
import os
import json
import utils
from utils import kfp_client_utils
from utils import sagemaker_utils
from utils import minio_utils
def create_workteamjob(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir, download_dir
):
test_params = utils.load_params(
utils.replace_placeholders(
os.path.join(test_file_dir, "config.yaml"),
os.path.join(download_dir, "config.yaml"),
)
)
# Get the account, region specific user_pool and client_id for the Sagemaker Workforce.
(
test_params["Arguments"]["user_pool"],
test_params["Arguments"]["client_id"],
test_params["Arguments"]["user_groups"],
) = sagemaker_utils.get_cognito_member_definitions(sagemaker_client)
# Generate random prefix for workteam_name to avoid errors if resources with same name exists
test_params["Arguments"]["team_name"] = workteam_name = (
utils.generate_random_string(5) + "-" + test_params["Arguments"]["team_name"]
)
_, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline(
kfp_client,
experiment_id,
test_params["PipelineDefinition"],
test_params["Arguments"],
download_dir,
test_params["TestName"],
test_params["Timeout"],
)
return workteam_name, workflow_json
@pytest.mark.parametrize(
"test_file_dir",
[pytest.param("resources/config/create-workteam", marks=pytest.mark.canary_test)],
)
def test_workteamjob(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir
):
download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated"))
workteam_name, workflow_json = create_workteamjob(
kfp_client, experiment_id, region, sagemaker_client, test_file_dir, download_dir
)
outputs = {"sagemaker-private-workforce": ["workteam_arn"]}
output_files = minio_utils.artifact_download_iterator(
workflow_json, outputs, download_dir
)
try:
response = sagemaker_utils.describe_workteam(sagemaker_client, workteam_name)
# Verify WorkTeam was created in SageMaker
assert response["Workteam"]["CreateDate"] is not None
assert response["Workteam"]["WorkteamName"] == workteam_name
# Verify WorkTeam arn artifact was created in Minio and matches the one in SageMaker
workteam_arn = utils.read_from_file_in_tar(
output_files["sagemaker-private-workforce"]["workteam_arn"],
"workteam_arn.txt",
)
assert response["Workteam"]["WorkteamArn"] == workteam_arn
finally:
# Cleanup the SageMaker Resources
sagemaker_utils.delete_workteam(sagemaker_client, workteam_name)
# Delete generated files only if the test is successful
utils.remove_dir(download_dir)

View File

@ -87,6 +87,7 @@ def kfp_client():
kfp_installed_namespace = utils.get_kfp_namespace()
return kfp.Client(namespace=kfp_installed_namespace)
def get_experiment_id(kfp_client):
exp_name = datetime.now().strftime("%Y-%m-%d-%H-%M")
try:
@ -95,6 +96,7 @@ def get_experiment_id(kfp_client):
experiment = kfp_client.create_experiment(name=exp_name)
return experiment.id
@pytest.fixture(scope="session")
def experiment_id(kfp_client, tmp_path_factory, worker_id):
if not worker_id:

View File

@ -0,0 +1,10 @@
PipelineDefinition: resources/definition/workteam_pipeline.py
TestName: create-workteam
Timeout: 3600
Arguments:
region: ((REGION))
team_name: 'test-workteam'
description: 'Team for GroundTruth Integ Test'
user_pool: 'user-pool'
user_groups: 'user-group'
client_id: 'client-id'

View File

@ -0,0 +1,22 @@
PipelineDefinition: resources/definition/groundtruth_pipeline.py
TestName: image-classification-groundtruth
Timeout: 10
StatusToCheck: 'running'
Arguments:
region: ((REGION))
role: ((ROLE_ARN))
ground_truth_train_job_name: 'image-labeling'
ground_truth_label_attribute_name: 'category'
ground_truth_train_manifest_location: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/train.manifest'
ground_truth_output_location: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/output'
ground_truth_task_type: 'image classification'
ground_truth_worker_type: 'private'
ground_truth_label_category_config: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/class_labels.json'
ground_truth_ui_template: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/instructions.template'
ground_truth_title: 'Mini image classification'
ground_truth_description: 'Test for Ground Truth KFP component'
ground_truth_num_workers_per_object: '1'
ground_truth_time_limit: '30'
ground_truth_task_availibility: '3600'
ground_truth_max_concurrent_tasks: '20'
workteam_arn: 'workteam-arn'

View File

@ -13,9 +13,9 @@ Arguments:
image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
model_artifact_url: s3://((DATA_BUCKET))/mnist_kmeans_example/model/kmeans-mnist-model/model.tar.gz
variant_name_1: variant-1
initial_variant_weight_1: 1.0
instance_type_1: ml.m4.xlarge
initial_instance_count_1: 1
initial_variant_weight_1: 1.0
network_isolation: "True"
role: ((ROLE_ARN))

View File

@ -0,0 +1,59 @@
import kfp
import json
import copy
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret
sagemaker_gt_op = components.load_component_from_file(
"../../ground_truth/component.yaml"
)
@dsl.pipeline(
name="SageMaker GroundTruth image classification test pipeline",
description="SageMaker GroundTruth image classification test pipeline",
)
def ground_truth_test(
region="",
ground_truth_train_job_name="",
ground_truth_label_attribute_name="",
ground_truth_train_manifest_location="",
ground_truth_output_location="",
ground_truth_task_type="",
ground_truth_worker_type="",
ground_truth_label_category_config="",
ground_truth_ui_template="",
ground_truth_title="",
ground_truth_description="",
ground_truth_num_workers_per_object="",
ground_truth_time_limit="",
ground_truth_task_availibility="",
ground_truth_max_concurrent_tasks="",
role="",
workteam_arn="",
):
ground_truth_train = sagemaker_gt_op(
region=region,
role=role,
job_name=ground_truth_train_job_name,
label_attribute_name=ground_truth_label_attribute_name,
manifest_location=ground_truth_train_manifest_location,
output_location=ground_truth_output_location,
task_type=ground_truth_task_type,
worker_type=ground_truth_worker_type,
workteam_arn=workteam_arn,
label_category_config=ground_truth_label_category_config,
ui_template=ground_truth_ui_template,
title=ground_truth_title,
description=ground_truth_description,
num_workers_per_object=ground_truth_num_workers_per_object,
time_limit=ground_truth_time_limit,
task_availibility=ground_truth_task_availibility,
max_concurrent_tasks=ground_truth_max_concurrent_tasks,
)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(ground_truth_test, __file__ + ".yaml")

View File

@ -0,0 +1,36 @@
#!/usr/bin/env python3
import kfp
import json
import copy
from kfp import components
from kfp import dsl
from kfp.aws import use_aws_secret
sagemaker_workteam_op = components.load_component_from_file(
"../../workteam/component.yaml"
)
@dsl.pipeline(
name="SageMaker WorkTeam test pipeline",
description="SageMaker WorkTeam test pipeline",
)
def workteam_test(
region="", team_name="", description="", user_pool="", user_groups="", client_id=""
):
workteam = sagemaker_workteam_op(
region=region,
team_name=team_name,
description=description,
user_pool=user_pool,
user_groups=user_groups,
client_id=client_id,
)
if __name__ == "__main__":
kfp.compiler.Compiler().compile(
workteam_test, "SageMaker_WorkTeam_Pipelines" + ".yaml"
)

View File

@ -1,6 +1,7 @@
import os
import utils
import pytest
import time
from utils import argo_utils
@ -23,9 +24,20 @@ def compile_and_run_pipeline(
return run.id
def wait_for_job_completion(client, run_id, timeout):
def wait_for_job_completion(client, run_id, timeout, status_to_check):
response = client.wait_for_run_completion(run_id, timeout)
status = response.run.status.lower() == "succeeded"
status = response.run.status.lower() == status_to_check
return status
def wait_for_job_status(client, run_id, timeout, status_to_check="succeeded"):
if status_to_check == "succeeded":
status = wait_for_job_completion(client, run_id, timeout, status_to_check)
else:
time.sleep(timeout)
response = client.get_run(run_id)
status = response.run.status.lower() == status_to_check
return status
@ -43,6 +55,7 @@ def compile_run_monitor_pipeline(
output_file_dir,
pipeline_name,
timeout,
status_to_check="succeeded",
check=True,
):
run_id = compile_and_run_pipeline(
@ -53,7 +66,7 @@ def compile_run_monitor_pipeline(
output_file_dir,
pipeline_name,
)
status = wait_for_job_completion(client, run_id, timeout)
status = wait_for_job_status(client, run_id, timeout, status_to_check)
workflow_json = get_workflow_json(client, run_id)
if check and not status:

View File

@ -24,3 +24,46 @@ def describe_hpo_job(client, job_name):
def describe_transform_job(client, job_name):
return client.describe_transform_job(TransformJobName=job_name)
def describe_workteam(client, workteam_name):
return client.describe_workteam(WorkteamName=workteam_name)
def list_workteams(client):
return client.list_workteams()
def get_cognito_member_definitions(client):
# This is one way to get the user_pool and client_id for the Sagemaker Workforce.
# An alternative would be to take these values as user input via params or a config file.
# The current mechanism expects that there exists atleast one private workteam in the region.
default_workteam = list_workteams(client)["Workteams"][0]["MemberDefinitions"][0][
"CognitoMemberDefinition"
]
return (
default_workteam["UserPool"],
default_workteam["ClientId"],
default_workteam["UserGroup"],
)
def list_labeling_jobs_for_workteam(client, workteam_arn):
return client.list_labeling_jobs_for_workteam(WorkteamArn=workteam_arn)
def describe_labeling_job(client, labeling_job_name):
return client.describe_labeling_job(LabelingJobName=labeling_job_name)
def get_workteam_arn(client, workteam_name):
response = describe_workteam(client, workteam_name)
return response["Workteam"]["WorkteamArn"]
def delete_workteam(client, workteam_name):
client.delete_workteam(WorkteamName=workteam_name)
def stop_labeling_job(client, labeling_job_name):
client.stop_labeling_job(LabelingJobName=labeling_job_name)