From fb549531f1db3d4b3e2de0bba91822a3a9ebb12b Mon Sep 17 00:00:00 2001 From: Meghna Baijal <30911248+mbaijal@users.noreply.github.com> Date: Tue, 26 May 2020 15:44:40 -0700 Subject: [PATCH] [AWS SageMaker] Integration Test for AWS SageMaker GroundTruth Component (#3830) * Integration Test for AWS SageMaker GroundTruth Component * Unfix already fixed bug * Fix the README I overwrote by mistake * Remove use of aws-secret for OIDC * Rev 2: Fix linting errors --- .../aws/sagemaker/ground_truth/component.yaml | 4 +- .../tests/integration_tests/README.md | 6 +- .../test_groundtruth_component.py | 87 +++++++++++++++++++ .../test_workteam_component.py | 83 ++++++++++++++++++ .../tests/integration_tests/conftest.py | 4 +- .../config/create-workteam/config.yaml | 10 +++ .../config.yaml | 22 +++++ .../config/kmeans-mnist-endpoint/config.yaml | 2 +- .../definition/groundtruth_pipeline.py | 59 +++++++++++++ .../resources/definition/workteam_pipeline.py | 36 ++++++++ .../utils/kfp_client_utils.py | 19 +++- .../utils/sagemaker_utils.py | 43 +++++++++ 12 files changed, 367 insertions(+), 8 deletions(-) create mode 100644 components/aws/sagemaker/tests/integration_tests/component_tests/test_groundtruth_component.py create mode 100644 components/aws/sagemaker/tests/integration_tests/component_tests/test_workteam_component.py create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/config/create-workteam/config.yaml create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/config/image-classification-groundtruth/config.yaml create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/definition/groundtruth_pipeline.py create mode 100644 components/aws/sagemaker/tests/integration_tests/resources/definition/workteam_pipeline.py diff --git a/components/aws/sagemaker/ground_truth/component.yaml b/components/aws/sagemaker/ground_truth/component.yaml index 3a143cc775..bb25b14a98 100644 --- a/components/aws/sagemaker/ground_truth/component.yaml +++ b/components/aws/sagemaker/ground_truth/component.yaml @@ -49,11 +49,11 @@ inputs: type: String - name: max_human_labeled_objects description: 'The maximum number of objects that can be labeled by human workers.' - default: '' + default: '0' type: Integer - name: max_percent_objects description: 'The maximum number of input data objects that should be labeled.' - default: '' + default: '0' type: Integer - name: enable_auto_labeling description: 'Enables auto-labeling, only for bounding box, text classification, and image classification.' diff --git a/components/aws/sagemaker/tests/integration_tests/README.md b/components/aws/sagemaker/tests/integration_tests/README.md index e43fbe525b..6076956ef5 100644 --- a/components/aws/sagemaker/tests/integration_tests/README.md +++ b/components/aws/sagemaker/tests/integration_tests/README.md @@ -2,10 +2,14 @@ 1. [Docker](https://www.docker.com/) 1. [IAM Role](https://docs.aws.amazon.com/sagemaker/latest/dg/sagemaker-roles.html) with a SageMakerFullAccess and AmazonS3FullAccess 1. IAM User credentials with SageMakerFullAccess, AWSCloudFormationFullAccess, IAMFullAccess, AmazonEC2FullAccess, AmazonS3FullAccess permissions +2. The SageMaker WorkTeam and GroundTruth Component tests expect that at least one private workteam already exists in the region where you are running these tests. + ## Creating S3 buckets with datasets -In the following Python script, change the bucket name and run the [`s3_sample_data_creator.py`](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset) to create an S3 bucket with the sample mnist dataset in the region where you want to run the tests. +1. In the following Python script, change the bucket name and run the [`s3_sample_data_creator.py`](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/mnist-kmeans-sagemaker#the-sample-dataset) to create an S3 bucket with the sample mnist dataset in the region where you want to run the tests. +2. To prepare the dataset for the SageMaker GroundTruth Component test, follow the steps in the `[GroundTruth Sample README](https://github.com/kubeflow/pipelines/tree/master/samples/contrib/aws-samples/ground_truth_pipeline_demo#prep-the-dataset-label-categories-and-ui-template)`. + ## Step to run integration tests 1. Copy the `.env.example` file to `.env` and in the following steps modify the fields of this new file: diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_groundtruth_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_groundtruth_component.py new file mode 100644 index 0000000000..18c0ad485d --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_groundtruth_component.py @@ -0,0 +1,87 @@ +import pytest +import os +import json +import utils +from utils import kfp_client_utils +from utils import sagemaker_utils +from test_workteam_component import create_workteamjob +import time + + +@pytest.mark.parametrize( + "test_file_dir", + [ + pytest.param( + "resources/config/image-classification-groundtruth", + marks=pytest.mark.canary_test, + ) + ], +) +def test_groundtruth_labeling_job( + kfp_client, experiment_id, region, sagemaker_client, test_file_dir +): + + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "config.yaml"), + ) + ) + + # First create a workteam using a separate pipeline and get the name, arn of the workteam created. + workteam_name, _ = create_workteamjob( + kfp_client, + experiment_id, + region, + sagemaker_client, + "resources/config/create-workteam", + download_dir, + ) + + test_params["Arguments"][ + "workteam_arn" + ] = workteam_arn = sagemaker_utils.get_workteam_arn(sagemaker_client, workteam_name) + + # Generate the ground_truth_train_job_name based on the workteam which will be used for labeling. + test_params["Arguments"][ + "ground_truth_train_job_name" + ] = ground_truth_train_job_name = ( + test_params["Arguments"]["ground_truth_train_job_name"] + "-by-" + workteam_name + ) + + _ = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + test_params["Timeout"], + test_params["StatusToCheck"], + ) + + # Verify the GroundTruthJob was created in SageMaker and is InProgress. + # TODO: Add a bot to complete the labeling job and check for completion instead. + try: + response = sagemaker_utils.describe_labeling_job( + sagemaker_client, ground_truth_train_job_name + ) + assert response["LabelingJobStatus"] == "InProgress" + + # Verify that the workteam has the specified labeling job + labeling_jobs = sagemaker_utils.list_labeling_jobs_for_workteam( + sagemaker_client, workteam_arn + ) + assert len(labeling_jobs["LabelingJobSummaryList"]) == 1 + assert ( + labeling_jobs["LabelingJobSummaryList"][0]["LabelingJobName"] + == ground_truth_train_job_name + ) + finally: + # Cleanup the SageMaker Resources + sagemaker_utils.stop_labeling_job(sagemaker_client, ground_truth_train_job_name) + sagemaker_utils.delete_workteam(sagemaker_client, workteam_name) + + # Delete generated files + utils.remove_dir(download_dir) diff --git a/components/aws/sagemaker/tests/integration_tests/component_tests/test_workteam_component.py b/components/aws/sagemaker/tests/integration_tests/component_tests/test_workteam_component.py new file mode 100644 index 0000000000..e2243de887 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/component_tests/test_workteam_component.py @@ -0,0 +1,83 @@ +import pytest +import os +import json +import utils +from utils import kfp_client_utils +from utils import sagemaker_utils +from utils import minio_utils + + +def create_workteamjob( + kfp_client, experiment_id, region, sagemaker_client, test_file_dir, download_dir +): + + test_params = utils.load_params( + utils.replace_placeholders( + os.path.join(test_file_dir, "config.yaml"), + os.path.join(download_dir, "config.yaml"), + ) + ) + + # Get the account, region specific user_pool and client_id for the Sagemaker Workforce. + ( + test_params["Arguments"]["user_pool"], + test_params["Arguments"]["client_id"], + test_params["Arguments"]["user_groups"], + ) = sagemaker_utils.get_cognito_member_definitions(sagemaker_client) + + # Generate random prefix for workteam_name to avoid errors if resources with same name exists + test_params["Arguments"]["team_name"] = workteam_name = ( + utils.generate_random_string(5) + "-" + test_params["Arguments"]["team_name"] + ) + + _, _, workflow_json = kfp_client_utils.compile_run_monitor_pipeline( + kfp_client, + experiment_id, + test_params["PipelineDefinition"], + test_params["Arguments"], + download_dir, + test_params["TestName"], + test_params["Timeout"], + ) + + return workteam_name, workflow_json + + +@pytest.mark.parametrize( + "test_file_dir", + [pytest.param("resources/config/create-workteam", marks=pytest.mark.canary_test)], +) +def test_workteamjob( + kfp_client, experiment_id, region, sagemaker_client, test_file_dir +): + + download_dir = utils.mkdir(os.path.join(test_file_dir + "/generated")) + workteam_name, workflow_json = create_workteamjob( + kfp_client, experiment_id, region, sagemaker_client, test_file_dir, download_dir + ) + + outputs = {"sagemaker-private-workforce": ["workteam_arn"]} + output_files = minio_utils.artifact_download_iterator( + workflow_json, outputs, download_dir + ) + + try: + response = sagemaker_utils.describe_workteam(sagemaker_client, workteam_name) + + # Verify WorkTeam was created in SageMaker + assert response["Workteam"]["CreateDate"] is not None + assert response["Workteam"]["WorkteamName"] == workteam_name + + # Verify WorkTeam arn artifact was created in Minio and matches the one in SageMaker + workteam_arn = utils.read_from_file_in_tar( + output_files["sagemaker-private-workforce"]["workteam_arn"], + "workteam_arn.txt", + ) + assert response["Workteam"]["WorkteamArn"] == workteam_arn + + finally: + # Cleanup the SageMaker Resources + sagemaker_utils.delete_workteam(sagemaker_client, workteam_name) + + # Delete generated files only if the test is successful + utils.remove_dir(download_dir) diff --git a/components/aws/sagemaker/tests/integration_tests/conftest.py b/components/aws/sagemaker/tests/integration_tests/conftest.py index 52c29656cc..c022cadd2c 100644 --- a/components/aws/sagemaker/tests/integration_tests/conftest.py +++ b/components/aws/sagemaker/tests/integration_tests/conftest.py @@ -87,6 +87,7 @@ def kfp_client(): kfp_installed_namespace = utils.get_kfp_namespace() return kfp.Client(namespace=kfp_installed_namespace) + def get_experiment_id(kfp_client): exp_name = datetime.now().strftime("%Y-%m-%d-%H-%M") try: @@ -95,6 +96,7 @@ def get_experiment_id(kfp_client): experiment = kfp_client.create_experiment(name=exp_name) return experiment.id + @pytest.fixture(scope="session") def experiment_id(kfp_client, tmp_path_factory, worker_id): if not worker_id: @@ -112,4 +114,4 @@ def experiment_id(kfp_client, tmp_path_factory, worker_id): else: data = get_experiment_id(kfp_client) fn.write_text(data) - return data \ No newline at end of file + return data diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/create-workteam/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/create-workteam/config.yaml new file mode 100644 index 0000000000..244148e6ab --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/create-workteam/config.yaml @@ -0,0 +1,10 @@ +PipelineDefinition: resources/definition/workteam_pipeline.py +TestName: create-workteam +Timeout: 3600 +Arguments: + region: ((REGION)) + team_name: 'test-workteam' + description: 'Team for GroundTruth Integ Test' + user_pool: 'user-pool' + user_groups: 'user-group' + client_id: 'client-id' diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/image-classification-groundtruth/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/image-classification-groundtruth/config.yaml new file mode 100644 index 0000000000..bd583c9e69 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/image-classification-groundtruth/config.yaml @@ -0,0 +1,22 @@ +PipelineDefinition: resources/definition/groundtruth_pipeline.py +TestName: image-classification-groundtruth +Timeout: 10 +StatusToCheck: 'running' +Arguments: + region: ((REGION)) + role: ((ROLE_ARN)) + ground_truth_train_job_name: 'image-labeling' + ground_truth_label_attribute_name: 'category' + ground_truth_train_manifest_location: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/train.manifest' + ground_truth_output_location: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/output' + ground_truth_task_type: 'image classification' + ground_truth_worker_type: 'private' + ground_truth_label_category_config: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/class_labels.json' + ground_truth_ui_template: 's3://((DATA_BUCKET))/mini-image-classification/ground-truth-demo/instructions.template' + ground_truth_title: 'Mini image classification' + ground_truth_description: 'Test for Ground Truth KFP component' + ground_truth_num_workers_per_object: '1' + ground_truth_time_limit: '30' + ground_truth_task_availibility: '3600' + ground_truth_max_concurrent_tasks: '20' + workteam_arn: 'workteam-arn' diff --git a/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-endpoint/config.yaml b/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-endpoint/config.yaml index f4a413c828..d75320eb6b 100644 --- a/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-endpoint/config.yaml +++ b/components/aws/sagemaker/tests/integration_tests/resources/config/kmeans-mnist-endpoint/config.yaml @@ -13,9 +13,9 @@ Arguments: image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1 model_artifact_url: s3://((DATA_BUCKET))/mnist_kmeans_example/model/kmeans-mnist-model/model.tar.gz variant_name_1: variant-1 + initial_variant_weight_1: 1.0 instance_type_1: ml.m4.xlarge initial_instance_count_1: 1 - initial_variant_weight_1: 1.0 network_isolation: "True" role: ((ROLE_ARN)) \ No newline at end of file diff --git a/components/aws/sagemaker/tests/integration_tests/resources/definition/groundtruth_pipeline.py b/components/aws/sagemaker/tests/integration_tests/resources/definition/groundtruth_pipeline.py new file mode 100644 index 0000000000..23456abf54 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/definition/groundtruth_pipeline.py @@ -0,0 +1,59 @@ +import kfp +import json +import copy +from kfp import components +from kfp import dsl +from kfp.aws import use_aws_secret + +sagemaker_gt_op = components.load_component_from_file( + "../../ground_truth/component.yaml" +) + + +@dsl.pipeline( + name="SageMaker GroundTruth image classification test pipeline", + description="SageMaker GroundTruth image classification test pipeline", +) +def ground_truth_test( + region="", + ground_truth_train_job_name="", + ground_truth_label_attribute_name="", + ground_truth_train_manifest_location="", + ground_truth_output_location="", + ground_truth_task_type="", + ground_truth_worker_type="", + ground_truth_label_category_config="", + ground_truth_ui_template="", + ground_truth_title="", + ground_truth_description="", + ground_truth_num_workers_per_object="", + ground_truth_time_limit="", + ground_truth_task_availibility="", + ground_truth_max_concurrent_tasks="", + role="", + workteam_arn="", +): + + ground_truth_train = sagemaker_gt_op( + region=region, + role=role, + job_name=ground_truth_train_job_name, + label_attribute_name=ground_truth_label_attribute_name, + manifest_location=ground_truth_train_manifest_location, + output_location=ground_truth_output_location, + task_type=ground_truth_task_type, + worker_type=ground_truth_worker_type, + workteam_arn=workteam_arn, + label_category_config=ground_truth_label_category_config, + ui_template=ground_truth_ui_template, + title=ground_truth_title, + description=ground_truth_description, + num_workers_per_object=ground_truth_num_workers_per_object, + time_limit=ground_truth_time_limit, + task_availibility=ground_truth_task_availibility, + max_concurrent_tasks=ground_truth_max_concurrent_tasks, + ) + + +if __name__ == "__main__": + kfp.compiler.Compiler().compile(ground_truth_test, __file__ + ".yaml") diff --git a/components/aws/sagemaker/tests/integration_tests/resources/definition/workteam_pipeline.py b/components/aws/sagemaker/tests/integration_tests/resources/definition/workteam_pipeline.py new file mode 100644 index 0000000000..3e22733012 --- /dev/null +++ b/components/aws/sagemaker/tests/integration_tests/resources/definition/workteam_pipeline.py @@ -0,0 +1,36 @@ +#!/usr/bin/env python3 + +import kfp +import json +import copy +from kfp import components +from kfp import dsl +from kfp.aws import use_aws_secret + +sagemaker_workteam_op = components.load_component_from_file( + "../../workteam/component.yaml" +) + + +@dsl.pipeline( + name="SageMaker WorkTeam test pipeline", + description="SageMaker WorkTeam test pipeline", +) +def workteam_test( + region="", team_name="", description="", user_pool="", user_groups="", client_id="" +): + + workteam = sagemaker_workteam_op( + region=region, + team_name=team_name, + description=description, + user_pool=user_pool, + user_groups=user_groups, + client_id=client_id, + ) + + +if __name__ == "__main__": + kfp.compiler.Compiler().compile( + workteam_test, "SageMaker_WorkTeam_Pipelines" + ".yaml" + ) diff --git a/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py index 9d3882d7b8..2949f7b3c0 100644 --- a/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py +++ b/components/aws/sagemaker/tests/integration_tests/utils/kfp_client_utils.py @@ -1,6 +1,7 @@ import os import utils import pytest +import time from utils import argo_utils @@ -23,9 +24,20 @@ def compile_and_run_pipeline( return run.id -def wait_for_job_completion(client, run_id, timeout): +def wait_for_job_completion(client, run_id, timeout, status_to_check): response = client.wait_for_run_completion(run_id, timeout) - status = response.run.status.lower() == "succeeded" + status = response.run.status.lower() == status_to_check + return status + + +def wait_for_job_status(client, run_id, timeout, status_to_check="succeeded"): + if status_to_check == "succeeded": + status = wait_for_job_completion(client, run_id, timeout, status_to_check) + else: + time.sleep(timeout) + response = client.get_run(run_id) + status = response.run.status.lower() == status_to_check + return status @@ -43,6 +55,7 @@ def compile_run_monitor_pipeline( output_file_dir, pipeline_name, timeout, + status_to_check="succeeded", check=True, ): run_id = compile_and_run_pipeline( @@ -53,7 +66,7 @@ def compile_run_monitor_pipeline( output_file_dir, pipeline_name, ) - status = wait_for_job_completion(client, run_id, timeout) + status = wait_for_job_status(client, run_id, timeout, status_to_check) workflow_json = get_workflow_json(client, run_id) if check and not status: diff --git a/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py b/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py index ad963b4ebc..4898d89adb 100644 --- a/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py +++ b/components/aws/sagemaker/tests/integration_tests/utils/sagemaker_utils.py @@ -24,3 +24,46 @@ def describe_hpo_job(client, job_name): def describe_transform_job(client, job_name): return client.describe_transform_job(TransformJobName=job_name) + + +def describe_workteam(client, workteam_name): + return client.describe_workteam(WorkteamName=workteam_name) + + +def list_workteams(client): + return client.list_workteams() + + +def get_cognito_member_definitions(client): + # This is one way to get the user_pool and client_id for the Sagemaker Workforce. + # An alternative would be to take these values as user input via params or a config file. + # The current mechanism expects that there exists atleast one private workteam in the region. + default_workteam = list_workteams(client)["Workteams"][0]["MemberDefinitions"][0][ + "CognitoMemberDefinition" + ] + return ( + default_workteam["UserPool"], + default_workteam["ClientId"], + default_workteam["UserGroup"], + ) + + +def list_labeling_jobs_for_workteam(client, workteam_arn): + return client.list_labeling_jobs_for_workteam(WorkteamArn=workteam_arn) + + +def describe_labeling_job(client, labeling_job_name): + return client.describe_labeling_job(LabelingJobName=labeling_job_name) + + +def get_workteam_arn(client, workteam_name): + response = describe_workteam(client, workteam_name) + return response["Workteam"]["WorkteamArn"] + + +def delete_workteam(client, workteam_name): + client.delete_workteam(WorkteamName=workteam_name) + + +def stop_labeling_job(client, labeling_job_name): + client.stop_labeling_job(LabelingJobName=labeling_job_name)