[AWS SageMaker] Add working FSx setup and test (#3831)
* Add working FSx setup and test * Removed duplicate test function * Replaced failure return with exit * Update parallel methods to export * Update EKS cluster name outside parallel task * Add SKIP_FSX_TEST in buildspec * Add revoke security group ingress * Add default pytest FSx values
This commit is contained in:
parent
1e2b9d4e7e
commit
37a63638c7
|
|
@ -2,7 +2,7 @@ version: 0.2
|
|||
|
||||
env:
|
||||
variables:
|
||||
CONTAINER_VARIABLES: "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI EKS_PRIVATE_SUBNETS EKS_PUBLIC_SUBNETS PYTEST_ADDOPTS S3_DATA_BUCKET EKS_EXISTING_CLUSTER SAGEMAKER_EXECUTION_ROLE_ARN REGION"
|
||||
CONTAINER_VARIABLES: "AWS_CONTAINER_CREDENTIALS_RELATIVE_URI EKS_PRIVATE_SUBNETS EKS_PUBLIC_SUBNETS PYTEST_ADDOPTS S3_DATA_BUCKET EKS_EXISTING_CLUSTER SAGEMAKER_EXECUTION_ROLE_ARN REGION SKIP_FSX_TESTS"
|
||||
|
||||
phases:
|
||||
build:
|
||||
|
|
|
|||
|
|
@ -9,4 +9,7 @@ SAGEMAKER_EXECUTION_ROLE_ARN=arn:aws:iam::123456789012:role/service-role/AmazonS
|
|||
S3_DATA_BUCKET=my-data-bucket
|
||||
|
||||
# If you hope to use an existing EKS cluster, rather than creating a new one.
|
||||
# EKS_EXISTING_CLUSTER=my-eks-cluster
|
||||
# EKS_EXISTING_CLUSTER=my-eks-cluster
|
||||
|
||||
# If you would like to skip the FSx set-up and tests
|
||||
# SKIP_FSX_TESTS=true
|
||||
|
|
@ -12,7 +12,8 @@ from utils import sagemaker_utils
|
|||
[
|
||||
pytest.param(
|
||||
"resources/config/simple-mnist-training", marks=pytest.mark.canary_test
|
||||
)
|
||||
),
|
||||
pytest.param("resources/config/fsx-mnist-training", marks=pytest.mark.fsx_test),
|
||||
],
|
||||
)
|
||||
def test_trainingjob(
|
||||
|
|
|
|||
|
|
@ -35,6 +35,24 @@ def pytest_addoption(parser):
|
|||
required=False,
|
||||
help="Cluster namespace where kubeflow pipelines is installed",
|
||||
)
|
||||
parser.addoption(
|
||||
"--fsx-subnet",
|
||||
required=False,
|
||||
help="The subnet in which FSx is installed",
|
||||
default="",
|
||||
)
|
||||
parser.addoption(
|
||||
"--fsx-security-group",
|
||||
required=False,
|
||||
help="The security group SageMaker should use when running the FSx test",
|
||||
default="",
|
||||
)
|
||||
parser.addoption(
|
||||
"--fsx-id",
|
||||
required=False,
|
||||
help="The file system ID of the FSx instance",
|
||||
default="",
|
||||
)
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
|
|
@ -67,6 +85,24 @@ def kfp_namespace(request):
|
|||
return request.config.getoption("--kfp-namespace")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def fsx_subnet(request):
|
||||
os.environ["FSX_SUBNET"] = request.config.getoption("--fsx-subnet")
|
||||
return request.config.getoption("--fsx-subnet")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def fsx_security_group(request):
|
||||
os.environ["FSX_SECURITY_GROUP"] = request.config.getoption("--fsx-security-group")
|
||||
return request.config.getoption("--fsx-security-group")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", autouse=True)
|
||||
def fsx_id(request):
|
||||
os.environ["FSX_ID"] = request.config.getoption("--fsx-id")
|
||||
return request.config.getoption("--fsx-id")
|
||||
|
||||
|
||||
@pytest.fixture(scope="session")
|
||||
def boto3_session(region):
|
||||
return boto3.Session(region_name=region)
|
||||
|
|
|
|||
|
|
@ -1,4 +1,6 @@
|
|||
[pytest]
|
||||
junit_family = xunit2
|
||||
addopts = -rA
|
||||
markers =
|
||||
canary_test: test to be run as part of canaries.
|
||||
canary_test: test to be run as part of canaries.
|
||||
fsx_test: tests for FSx features
|
||||
|
|
@ -0,0 +1,36 @@
|
|||
PipelineDefinition: resources/definition/training_pipeline.py
|
||||
TestName: fsx-mnist-training
|
||||
Timeout: 3600
|
||||
ExpectedTrainingImage: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
|
||||
Arguments:
|
||||
region: ((REGION))
|
||||
image: ((KMEANS_REGISTRY)).dkr.ecr.((REGION)).amazonaws.com/kmeans:1
|
||||
training_input_mode: File
|
||||
hyperparameters:
|
||||
k: "10"
|
||||
feature_dim: "784"
|
||||
channels:
|
||||
- ChannelName: train
|
||||
DataSource:
|
||||
FileSystemDataSource:
|
||||
FileSystemType: FSxLustre
|
||||
FileSystemAccessMode: ro
|
||||
FileSystemId: ((FSX_ID))
|
||||
DirectoryPath: /fsx/mnist_kmeans_example/input
|
||||
CompressionType: None
|
||||
ContentType: text/csv;label_size=0
|
||||
RecordWrapperType: None
|
||||
InputMode: File
|
||||
vpc_security_group_ids: ((FSX_SECURITY_GROUP))
|
||||
vpc_subnets: ((FSX_SUBNET))
|
||||
instance_type: ml.m5.xlarge
|
||||
instance_count: 1
|
||||
volume_size: 50
|
||||
max_run_time: 3600
|
||||
model_artifact_path: s3://((DATA_BUCKET))/mnist_kmeans_example/output
|
||||
network_isolation: "True"
|
||||
traffic_encryption: "False"
|
||||
spot_instance: "False"
|
||||
max_wait_time: 3600
|
||||
checkpoint_config: "{}"
|
||||
role: ((ROLE_ARN))
|
||||
|
|
@ -25,6 +25,8 @@ def training_pipeline(
|
|||
spot_instance="",
|
||||
max_wait_time="",
|
||||
checkpoint_config="{}",
|
||||
vpc_security_group_ids="",
|
||||
vpc_subnets="",
|
||||
role="",
|
||||
):
|
||||
sagemaker_train_op(
|
||||
|
|
@ -45,6 +47,8 @@ def training_pipeline(
|
|||
spot_instance=spot_instance,
|
||||
max_wait_time=max_wait_time,
|
||||
checkpoint_config=checkpoint_config,
|
||||
vpc_security_group_ids=vpc_security_group_ids,
|
||||
vpc_subnets=vpc_subnets,
|
||||
role=role,
|
||||
)
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,79 @@
|
|||
#!/usr/bin/env bash
|
||||
|
||||
# Helper script that provides a set of methods to configure VPC, EFS and FSx
|
||||
# ready for the full suite of integration tests.
|
||||
|
||||
function create_fsx_security_group() {
|
||||
echo "[Creating FSx Security Group] Creating security group"
|
||||
|
||||
IFS=',' read -r -a subnets_list <<< "$EKS_PRIVATE_SUBNETS"
|
||||
local vpc_id="$(aws ec2 describe-subnets --subnet-ids "${subnets_list[0]}" \
|
||||
--output text --query "Subnets[0].VpcId" --region ${REGION})"
|
||||
|
||||
local fsx_security_group="${DEPLOY_NAME}-fsx-sg"
|
||||
FSX_SECURITY_GROUP_ID="$(aws ec2 create-security-group --region "${REGION}" \
|
||||
--vpc-id ${vpc_id} \
|
||||
--description "Security group for FSx in ${DEPLOY_NAME}" \
|
||||
--group-name "${fsx_security_group}" --output text --query "GroupId")"
|
||||
|
||||
# Open FSx port to internal security group
|
||||
aws ec2 authorize-security-group-ingress \
|
||||
--region "${REGION}" --group-id "${FSX_SECURITY_GROUP_ID}" \
|
||||
--protocol tcp --port 988 --source-group "${FSX_SECURITY_GROUP_ID}"
|
||||
|
||||
echo "[Creating FSx Security Group] Created security group ${FSX_SECURITY_GROUP_ID}"
|
||||
}
|
||||
|
||||
function cleanup_fsx_security_group() {
|
||||
if [ ! -z "${FSX_SECURITY_GROUP_ID}" ]; then
|
||||
# You must remove any self-referencing ingress rules before deleting a SG
|
||||
aws ec2 revoke-security-group-ingress --region "${REGION}" \
|
||||
--group-id "${FSX_SECURITY_GROUP_ID}" --protocol tcp --port 988 \
|
||||
--source-group "${FSX_SECURITY_GROUP_ID}"
|
||||
|
||||
aws ec2 delete-security-group --group-id "${FSX_SECURITY_GROUP_ID}" --region "${REGION}"
|
||||
fi
|
||||
}
|
||||
|
||||
# Creates a new FSX LUSTRE instance and automatically imports the data set from S3.
|
||||
function create_fsx_instance() {
|
||||
echo "[Creating FSx] Creating file system"
|
||||
IFS=',' read -r -a subnets_list <<< "$EKS_PRIVATE_SUBNETS"
|
||||
|
||||
local fs_id=$(aws fsx create-file-system \
|
||||
--file-system-type LUSTRE \
|
||||
--lustre-configuration ImportPath=s3://${S3_DATA_BUCKET}/mnist_kmeans_example \
|
||||
--storage-capacity 1200 \
|
||||
--subnet-ids "${subnets_list[0]}" \
|
||||
--security-group-ids "${FSX_SECURITY_GROUP_ID}" \
|
||||
--tags Key="Name",Value=fsx-integ-lustre \
|
||||
--region "${REGION}" \
|
||||
--output text \
|
||||
--query "FileSystem.FileSystemId")
|
||||
|
||||
echo "[Creating FSx] Waiting for file system to be in state AVAILABLE"
|
||||
|
||||
local fs_status="CREATING"
|
||||
until [[ "${fs_status}" != "CREATING" ]]; do
|
||||
fs_status="$(aws fsx describe-file-systems --region "${REGION}" --file-system-id ${fs_id} --output text --query "FileSystems[0].Lifecycle")"
|
||||
sleep 10
|
||||
done
|
||||
aws fsx --region "${REGION}" describe-file-systems --file-system-id ${fs_id}
|
||||
|
||||
if [[ "${fs_status}" != "AVAILABLE" ]]; then
|
||||
echo "[Creating FSx] FSx cluster never reached state 'Available'"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
FSX_ID="${fs_id}"
|
||||
|
||||
echo "[Creating FSx] File system now available as ${FSX_ID}"
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
function delete_fsx_instance() {
|
||||
if [ ! -z "${FSX_ID}" ]; then
|
||||
aws fsx delete-file-system --file-system-id "${FSX_ID}" --region "${REGION}"
|
||||
fi
|
||||
}
|
||||
|
|
@ -9,6 +9,7 @@ usage(){
|
|||
}
|
||||
|
||||
cwd=$(dirname $(realpath $0))
|
||||
source "$cwd"/fsx_setup
|
||||
|
||||
### Input parameters
|
||||
DEPLOY_NAME="sagemaker-kfp-"$(date '+%Y-%m-%d-%H-%M-%S')"" # The name given to the entire deployment (tagging all resources)
|
||||
|
|
@ -30,6 +31,8 @@ PYTEST_MARKER=${PYTEST_MARKER:-""}
|
|||
S3_DATA_BUCKET=${S3_DATA_BUCKET:-""}
|
||||
SAGEMAKER_EXECUTION_ROLE_ARN=${SAGEMAKER_EXECUTION_ROLE_ARN:-""}
|
||||
|
||||
SKIP_FSX_TESTS=${SKIP_FSX_TESTS:-"false"}
|
||||
|
||||
while getopts ":n:r:s:" opt; do
|
||||
case $opt in
|
||||
n)
|
||||
|
|
@ -65,12 +68,25 @@ if [ "$S3_DATA_BUCKET" == "" ]; then
|
|||
exit 1
|
||||
fi
|
||||
|
||||
if [[ "$SKIP_FSX_TESTS" == "false" && "$EKS_PRIVATE_SUBNETS" == "" ]]; then
|
||||
echo "Missing EKS private subnets"
|
||||
usage
|
||||
exit 1
|
||||
fi
|
||||
|
||||
function cleanup() {
|
||||
set +e
|
||||
|
||||
cleanup_kfp
|
||||
delete_generated_role
|
||||
|
||||
if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
|
||||
delete_fsx_instance
|
||||
# Sleep in order for the security group to detach before attempting to delete it
|
||||
sleep 15s
|
||||
cleanup_fsx_security_group
|
||||
fi
|
||||
|
||||
if [[ -z "${EKS_EXISTING_CLUSTER}" ]]; then
|
||||
delete_eks
|
||||
fi
|
||||
|
|
@ -81,8 +97,6 @@ trap cleanup EXIT
|
|||
set -e
|
||||
|
||||
function launch_eks() {
|
||||
EKS_CLUSTER_NAME="${DEPLOY_NAME}-eks-cluster"
|
||||
|
||||
echo "[Creating EKS] Launching EKS cluster $EKS_CLUSTER_NAME"
|
||||
|
||||
eksctl_args=( --managed --nodes "${EKS_NODE_COUNT}" --node-type=c5.xlarge --timeout=30m --region "${REGION}" --auto-kubeconfig --version "${EKS_CLUSTER_VERSION}" )
|
||||
|
|
@ -150,11 +164,26 @@ function cleanup_kfp() {
|
|||
}
|
||||
|
||||
if [[ -z "${EKS_EXISTING_CLUSTER}" ]]; then
|
||||
launch_eks
|
||||
# Launch all of these in parallel to reduce start-up time
|
||||
EKS_CLUSTER_NAME="${DEPLOY_NAME}-eks-cluster"
|
||||
launch_eks &
|
||||
|
||||
if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
|
||||
create_fsx_security_group
|
||||
create_fsx_instance
|
||||
fi
|
||||
|
||||
wait
|
||||
else
|
||||
aws eks update-kubeconfig --name "${EKS_EXISTING_CLUSTER}" --region "$REGION"
|
||||
EKS_CLUSTER_NAME="${EKS_EXISTING_CLUSTER}"
|
||||
DEPLOY_NAME="${EKS_EXISTING_CLUSTER}"
|
||||
|
||||
if [[ "${SKIP_FSX_TESTS}" == "false" ]]; then
|
||||
create_fsx_security_group
|
||||
create_fsx_instance
|
||||
fi
|
||||
wait
|
||||
fi
|
||||
|
||||
generate_iam_role_name
|
||||
|
|
@ -163,6 +192,15 @@ install_kfp
|
|||
install_generated_role
|
||||
|
||||
pytest_args=( --region "${REGION}" --role-arn "${SAGEMAKER_EXECUTION_ROLE_ARN}" --s3-data-bucket "${S3_DATA_BUCKET}" --minio-service-port "${MINIO_LOCAL_PORT}" --kfp-namespace "${KFP_NAMESPACE}" )
|
||||
|
||||
if [[ "${SKIP_FSX_TESTS}" == "true" ]]; then
|
||||
pytest_args+=( -m "not fsx_test" )
|
||||
else
|
||||
# Get the VPC arguments for the FSx test
|
||||
IFS=',' read -r -a private_subnets <<< "$EKS_PRIVATE_SUBNETS"
|
||||
pytest_args+=( --fsx-subnet "${private_subnets[0]}" --fsx-security-group "${FSX_SECURITY_GROUP_ID}" --fsx-id "${FSX_ID}" )
|
||||
fi
|
||||
|
||||
[ ! -z "${PYTEST_MARKER}" ] && pytest_args+=( -m "${PYTEST_MARKER}" )
|
||||
|
||||
cd tests/integration_tests && python -m pytest "${pytest_args[@]}" --junitxml ./integration_tests.log -n $(nproc)
|
||||
|
|
@ -30,6 +30,18 @@ def get_kfp_namespace():
|
|||
return os.environ.get("NAMESPACE")
|
||||
|
||||
|
||||
def get_fsx_subnet():
|
||||
return os.environ.get("FSX_SUBNET")
|
||||
|
||||
|
||||
def get_fsx_security_group():
|
||||
return os.environ.get("FSX_SECURITY_GROUP")
|
||||
|
||||
|
||||
def get_fsx_id():
|
||||
return os.environ.get("FSX_ID")
|
||||
|
||||
|
||||
def get_algorithm_image_registry(region, algorithm):
|
||||
return get_image_uri(region, algorithm).split(".")[0]
|
||||
|
||||
|
|
@ -61,12 +73,17 @@ def replace_placeholders(input_filename, output_filename):
|
|||
"((ROLE_ARN))": get_role_arn(),
|
||||
"((DATA_BUCKET))": get_s3_data_bucket(),
|
||||
"((KMEANS_REGISTRY))": get_algorithm_image_registry(region, "kmeans"),
|
||||
"((FSX_ID))": get_fsx_id(),
|
||||
"((FSX_SUBNET))": get_fsx_subnet(),
|
||||
"((FSX_SECURITY_GROUP))": get_fsx_security_group(),
|
||||
}
|
||||
|
||||
filedata = ""
|
||||
with open(input_filename, "r") as f:
|
||||
filedata = f.read()
|
||||
for replace_key, replace_value in variables_to_replace.items():
|
||||
if replace_value is None:
|
||||
continue
|
||||
filedata = filedata.replace(replace_key, replace_value)
|
||||
|
||||
with open(output_filename, "w") as f:
|
||||
|
|
|
|||
Loading…
Reference in New Issue