pipelines/components/PyTorch/pytorch-kfp-components/tests/test_compile_yamls.py

817 lines
32 KiB
Python

#!/usr/bin/env/python3
#
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#pylint: disable=not-callable,unused-variable
"""Test for component compilation."""
import os
import unittest
import json
import pytest
from kfp import components
from kfp.components import load_component_from_file
from kfp import dsl
from kfp import compiler
tests_dir, _ = os.path.split(os.path.abspath(__file__))
templates_dir = os.path.join(os.path.dirname(tests_dir), "templates")
components_dir = tests_dir.split("PyTorch")[0].rstrip("/")
BERT_COMPONENTS = {
"component_bert_prep":
components.
load_component_from_file(f"{templates_dir}/preprocess_component.yaml"),
"component_bert_train":
components.
load_component_from_file(f"{templates_dir}/train_component.yaml"),
}
CIFAR_COMPONENTS = {
"component_cifar10_prep":
components.
load_component_from_file(f"{templates_dir}/preprocess_component.yaml"),
"component_cifar10_train":
components.
load_component_from_file(f"{templates_dir}/train_component.yaml"),
}
COMPONENT_TB = load_component_from_file(
f"{templates_dir}/tensorboard_component.yaml"
)
COMPONENT_DEPLOY = load_component_from_file(
f"{components_dir}/kserve/component.yaml"
)
COMPONENT_MNIO = components.load_component_from_file(
f"{templates_dir}/minio_component.yaml"
)
PRED_OP = load_component_from_file(f"{templates_dir}/prediction_component.yaml")
class ComponentCompileTest(unittest.TestCase): #pylint: disable=too-many-instance-attributes
"""Test cases for compilation of yamls."""
def setUp(self):
"""Set up."""
super().setUp()
self.input_request = "./compile_test.json"
self.deploy_name_bert = "bertserve"
self.namespace = "kubeflow-user-example-com"
self.experiment = "Default"
self.minio_endpoint = "http://minio-service.kubeflow:9000"
self.log_bucket = "mlpipeline"
self.tensorboard_image = "public.ecr.aws/pytorch-samples/tboard:latest"
self.deploy_name_cifar = "torchserve"
self.model_name = "cifar10"
self.isvc_name = (
self.deploy_name_cifar + "." + self.namespace + "." + "example.com"
)
self.cookie = ""
self.ingress_gateway = (
"http://istio-ingressgateway.istio-system.svc.cluster.local"
)
def test_cifar10_compile(self):
"""Test Cifar10 yamls compile."""
@dsl.pipeline(
name="Training Cifar10 pipeline",
description="Cifar 10 dataset pipeline",
) #pylint: disable=too-many-arguments,too-many-locals
def pytorch_cifar10(
minio_endpoint=self.minio_endpoint,
log_bucket=self.log_bucket,
log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
tf_image=self.tensorboard_image,
deploy=self.deploy_name_cifar,
namespace=self.namespace,
confusion_matrix_log_dir=f"confusion_matrix"
f"/{dsl.RUN_ID_PLACEHOLDER}/",
checkpoint_dir="checkpoint_dir/cifar10",
):
"""Cifar10 pipelines."""
pod_template_spec = json.dumps({
"spec": {
"containers": [{
"env": [
{
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
}
},
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
},
},
{
"name": "AWS_REGION",
"value": "minio"
},
{
"name": "S3_ENDPOINT",
"value": f"{minio_endpoint}",
},
{
"name": "S3_USE_HTTPS",
"value": "0"
},
{
"name": "S3_VERIFY_SSL",
"value": "0"
},
]
}]
}
})
prepare_tb_task = COMPONENT_TB(
log_dir_uri=f"s3://{log_bucket}/{log_dir}",
image=tf_image,
pod_template_spec=pod_template_spec,
).set_display_name("Visualization")
component_cifar10_prep = CIFAR_COMPONENTS["component_cifar10_prep"]
prep_task = (
component_cifar10_prep().after(prepare_tb_task).
set_display_name("Preprocess & Transform")
)
component_cifar10_train = CIFAR_COMPONENTS["component_cifar10_train"
]
confusion_matrix_url = \
f"minio://{log_bucket}/{confusion_matrix_log_dir}"
script_args = f"model_name=resnet.pth," \
f"confusion_matrix_url={confusion_matrix_url}"
# For gpus, set number of gpus and accelerator type
ptl_args = "max_epochs=1, " \
"gpus=0, " \
"accelerator=None, " \
"profiler=pytorch"
train_task = (
component_cifar10_train(
input_data=prep_task.outputs["output_data"],
script_args=script_args,
ptl_arguments=ptl_args
).after(prep_task).set_display_name("Training")
)
minio_tb_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=log_dir,
input_path=train_task.outputs["tensorboard_root"],
filename="",
).after(train_task
).set_display_name("Tensorboard Events Pusher")
)
minio_checkpoint_dir_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=checkpoint_dir,
input_path=train_task.outputs["checkpoint_dir"],
filename="",
).after(train_task).set_display_name("checkpoint_dir Pusher")
)
minio_mar_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=mar_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="cifar10_test.mar",
).after(train_task).set_display_name("Mar Pusher")
)
minio_config_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=config_prop_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="config.properties",
).after(train_task).set_display_name("Conifg Pusher")
)
model_uri = str(model_uri)
isvc_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
""".format(deploy, namespace, model_uri)
# For GPU inference use below yaml with gpu count and accelerator
gpu_count = "1"
accelerator = "nvidia-tesla-p4"
isvc_gpu_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
nvidia.com/gpu: {}
nodeSelector:
cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
# Update inferenceservice_yaml for GPU inference
deploy_task = (
COMPONENT_DEPLOY(
action="apply", inferenceservice_yaml=isvc_yaml
).after(minio_mar_upload).set_display_name("Deployer")
)
pred_task = (
PRED_OP(
host_name=self.isvc_name,
input_request=self.input_request,
cookie=self.cookie,
url=self.ingress_gateway,
model=self.model_name,
inference_type="predict",
).after(deploy_task).set_display_name("Prediction")
)
explain_task = (
PRED_OP(
host_name=self.isvc_name,
input_request=self.input_request,
cookie=self.cookie,
url=self.ingress_gateway,
model=self.model_name,
inference_type="explain",
).after(pred_task).set_display_name("Explanation")
)
compiler.Compiler().compile(
pytorch_cifar10, "pytorch.tar.gz", type_check=True
)
def test_bert_compile(self):
"""Test bert yamls compilation."""
@dsl.pipeline(
name="Training pipeline", description="Sample training job test"
) #pylint: disable=too-many-arguments,too-many-locals
def pytorch_bert(
minio_endpoint=self.minio_endpoint,
log_bucket=self.log_bucket,
log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
tf_image=self.tensorboard_image,
deploy=self.deploy_name_bert,
namespace=self.namespace,
confusion_matrix_log_dir=
f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
num_samples=1000,
max_epochs=1
):
"""Bert Pipeline."""
prepare_tb_task = COMPONENT_TB(
log_dir_uri=f"s3://{log_bucket}/{log_dir}",
image=tf_image,
pod_template_spec=json.dumps({
"spec": {
"containers": [{
"env": [
{
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
}
},
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
},
},
{
"name": "AWS_REGION",
"value": "minio",
},
{
"name": "S3_ENDPOINT",
"value": f"{minio_endpoint}",
},
{
"name": "S3_USE_HTTPS",
"value": "0"
},
{
"name": "S3_VERIFY_SSL",
"value": "0"
},
]
}]
}
}),
).set_display_name("Visualization")
component_bert_prep = BERT_COMPONENTS["component_bert_prep"]
prep_task = (
component_bert_prep().after(prepare_tb_task).
set_display_name("Preprocess & Transform")
)
component_bert_train = BERT_COMPONENTS["component_bert_train"]
confusion_matrix_url = \
f"minio://{log_bucket}/{confusion_matrix_log_dir}"
script_args = f"model_name=bert.pth," \
f"num_samples={num_samples}," \
f"confusion_matrix_url={confusion_matrix_url}"
# For gpus, set number of gpus and accelerator type
ptl_args = f"max_epochs={max_epochs}," \
"profiler=pytorch," \
"gpus=0," \
"accelerator=None"
train_task = (
component_bert_train(
input_data=prep_task.outputs["output_data"],
script_args=script_args,
ptl_arguments=ptl_args
).after(prep_task).set_display_name("Training")
)
minio_tb_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=log_dir,
input_path=train_task.outputs["tensorboard_root"],
filename="",
).after(train_task
).set_display_name("Tensorboard Events Pusher")
)
minio_mar_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=mar_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="bert_test.mar",
).after(train_task).set_display_name("Mar Pusher")
)
minio_config_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=config_prop_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="config.properties",
).after(train_task).set_display_name("Conifg Pusher")
)
model_uri = str(model_uri)
isvc_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
""".format(deploy, namespace, model_uri)
# For GPU inference use below yaml with gpu count and accelerator
gpu_count = "1"
accelerator = "nvidia-tesla-p4"
isvc_gpu_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
nvidia.com/gpu: {}
nodeSelector:
cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
# Update inferenceservice_yaml for GPU inference
deploy_task = (
COMPONENT_DEPLOY(
action="apply", inferenceservice_yaml=isvc_yaml
).after(minio_mar_upload).set_display_name("Deployer")
)
compiler.Compiler().compile(
pytorch_bert, "pytorch.tar.gz", type_check=True
)
def test_cifar10_compile_fail(self):
"""Test Cifar10 yamls compile."""
@dsl.pipeline(
name="Training Cifar10 pipeline",
description="Cifar 10 dataset pipeline",
) #pylint: disable=too-many-arguments,too-many-locals
def pytorch_cifar10(
minio_endpoint=self.minio_endpoint,
log_bucket=self.log_bucket,
log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
tf_image=self.tensorboard_image,
deploy=self.deploy_name_cifar,
namespace=self.namespace,
confusion_matrix_log_dir=f"confusion_matrix"
f"/{dsl.RUN_ID_PLACEHOLDER}/", #pylint: disable=f-string-without-interpolation
checkpoint_dir=f"checkpoint_dir/cifar10",
):
"""Cifar10 pipelines."""
pod_template_spec = json.dumps({
"spec": {
"containers": [{
"env": [
{
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
}
},
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
},
},
{
"name": "AWS_REGION",
"value": "minio"
},
{
"name": "S3_ENDPOINT",
"value": f"{minio_endpoint}",
},
{
"name": "S3_USE_HTTPS",
"value": "0"
},
{
"name": "S3_VERIFY_SSL",
"value": "0"
},
]
}]
}
})
prepare_tb_task = COMPONENT_TB(
log_dir_uri=f"s3://{log_bucket}/{log_dir}",
image=tf_image,
pod_template_spec=pod_template_spec,
).set_display_name("Visualization")
component_cifar10_prep = ""
prep_task = (
component_cifar10_prep().after(prepare_tb_task).
set_display_name("Preprocess & Transform")
)
confusion_matrix_url = \
f"minio://{log_bucket}/{confusion_matrix_log_dir}"
script_args = f"model_name=resnet.pth," \
f"confusion_matrix_url={confusion_matrix_url}"
# For gpus, set number of gpus and accelerator type
ptl_args = "max_epochs=1, " \
"gpus=0, " \
"accelerator=None, " \
"profiler=pytorch"
component_cifar10_train = ""
train_task = (
component_cifar10_train(
input_data=prep_task.outputs["output_data"],
script_args=script_args,
ptl_arguments=ptl_args
).after(prep_task).set_display_name("Training")
)
minio_tb_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=log_dir,
input_path=train_task.outputs["tensorboard_root"],
filename="",
).after(train_task
).set_display_name("Tensorboard Events Pusher")
)
minio_checkpoint_dir_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=checkpoint_dir,
input_path=train_task.outputs["checkpoint_dir"],
filename="",
).after(train_task).set_display_name("checkpoint_dir Pusher")
)
minio_mar_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=mar_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="cifar10_test.mar",
).after(train_task).set_display_name("Mar Pusher")
)
minio_config_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=config_prop_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="config.properties",
).after(train_task).set_display_name("Conifg Pusher")
)
model_uri = str(model_uri)
isvc_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
""".format(deploy, namespace, model_uri)
# For GPU inference use below yaml with gpu count and accelerator
gpu_count = "1"
accelerator = "nvidia-tesla-p4"
isvc_gpu_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
nvidia.com/gpu: {}
nodeSelector:
cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
# Update inferenceservice_yaml for GPU inference
deploy_task = (
COMPONENT_DEPLOY(
action="apply", inferenceservice_yaml=isvc_yaml
).after(minio_mar_upload).set_display_name("Deployer")
)
pred_task = (
PRED_OP(
host_name=self.isvc_name,
input_request=self.input_request,
cookie=self.cookie,
url=self.ingress_gateway,
model=self.model_name,
inference_type="predict",
).after(deploy_task).set_display_name("Prediction")
)
explain_task = (
PRED_OP(
host_name=self.isvc_name,
input_request=self.input_request,
cookie=self.cookie,
url=self.ingress_gateway,
model=self.model_name,
inference_type="explain",
).after(pred_task).set_display_name("Explanation")
)
with pytest.raises(TypeError):
compiler.Compiler().compile(
pytorch_cifar10, "pytorch.tar.gz", type_check=True
)
def test_bert_compile_fail(self):
"""Test bert yamls compilation."""
@dsl.pipeline(
name="Training pipeline", description="Sample training job test"
) #pylint: disable=too-many-arguments,too-many-locals
def pytorch_bert(
minio_endpoint=self.minio_endpoint,
log_bucket=self.log_bucket,
log_dir=f"tensorboard/logs/{dsl.RUN_ID_PLACEHOLDER}",
mar_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/model-store",
config_prop_path=f"mar/{dsl.RUN_ID_PLACEHOLDER}/config",
model_uri=f"s3://mlpipeline/mar/{dsl.RUN_ID_PLACEHOLDER}",
tf_image=self.tensorboard_image,
deploy=self.deploy_name_bert,
namespace=self.namespace,
confusion_matrix_log_dir=
f"confusion_matrix/{dsl.RUN_ID_PLACEHOLDER}/",
num_samples=1000,
max_epochs=1
):
"""Bert Pipeline."""
prepare_tb_task = COMPONENT_TB(
log_dir_uri=f"s3://{log_bucket}/{log_dir}",
image=tf_image,
pod_template_spec=json.dumps({
"spec": {
"containers": [{
"env": [
{
"name": "AWS_ACCESS_KEY_ID",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "accesskey",
}
},
},
{
"name": "AWS_SECRET_ACCESS_KEY",
"valueFrom": {
"secretKeyRef": {
"name": "mlpipeline-minio-artifact",
"key": "secretkey",
}
},
},
{
"name": "AWS_REGION",
"value": "minio",
},
{
"name": "S3_ENDPOINT",
"value": f"{minio_endpoint}",
},
{
"name": "S3_USE_HTTPS",
"value": "0"
},
{
"name": "S3_VERIFY_SSL",
"value": "0"
},
]
}]
}
}),
).set_display_name("Visualization")
component_bert_prep = ""
prep_task = (
component_bert_prep().after(prepare_tb_task).
set_display_name("Preprocess & Transform")
)
confusion_matrix_url = \
f"minio://{log_bucket}/{confusion_matrix_log_dir}"
script_args = f"model_name=bert.pth," \
f"num_samples={num_samples}," \
f"confusion_matrix_url={confusion_matrix_url}"
# For gpus, set number of gpus and accelerator type
ptl_args = f"max_epochs={max_epochs}," \
"profiler=pytorch," \
"gpus=0," \
"accelerator=None"
component_bert_train = ""
train_task = (
component_bert_train(
input_data=prep_task.outputs["output_data"],
script_args=script_args,
ptl_arguments=ptl_args
).after(prep_task).set_display_name("Training")
)
minio_tb_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=log_dir,
input_path=train_task.outputs["tensorboard_root"],
filename="",
).after(train_task
).set_display_name("Tensorboard Events Pusher")
)
minio_mar_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=mar_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="bert_test.mar",
).after(train_task).set_display_name("Mar Pusher")
)
minio_config_upload = (
COMPONENT_MNIO(
bucket_name="mlpipeline",
folder_name=config_prop_path,
input_path=train_task.outputs["checkpoint_dir"],
filename="config.properties",
).after(train_task).set_display_name("Conifg Pusher")
)
model_uri = str(model_uri)
isvc_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
""".format(deploy, namespace, model_uri)
# For GPU inference use below yaml with gpu count and accelerator
gpu_count = "1"
accelerator = "nvidia-tesla-p4"
isvc_gpu_yaml = """
apiVersion: "serving.kubeflow.org/v1beta1"
kind: "InferenceService"
metadata:
name: {}
namespace: {}
spec:
predictor:
serviceAccountName: sa
pytorch:
storageUri: {}
resources:
limits:
memory: 4Gi
nvidia.com/gpu: {}
nodeSelector:
cloud.google.com/gke-accelerator: {}
""".format(deploy, namespace, model_uri, gpu_count, accelerator)
# Update inferenceservice_yaml for GPU inference
deploy_task = (
COMPONENT_DEPLOY(
action="apply", inferenceservice_yaml=isvc_yaml
).after(minio_mar_upload).set_display_name("Deployer")
)
with pytest.raises(TypeError):
compiler.Compiler().compile(
pytorch_bert, "pytorch.tar.gz", type_check=True
)