feat(components): Adding Minio component for PyTorch - KFP (#5808)

* Adding minio component and tests

Signed-off-by: Shrinath Suresh <shrinath@ideas2it.com>

* Minio executor fix

Signed-off-by: ankan94 <ankan@ideas2it.com>

Co-authored-by: ankan94 <ankan@ideas2it.com>
This commit is contained in:
shrinath-suresh 2021-06-08 10:27:45 +05:30 committed by GitHub
parent c5325db7d2
commit f843121e91
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 430 additions and 2 deletions

View File

@ -0,0 +1,65 @@
#!/usr/bin/env/python3
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Minio Component Module."""
from pytorch_kfp_components.components.base.base_component import BaseComponent
from pytorch_kfp_components.components.minio.executor import Executor
from pytorch_kfp_components.types import standard_component_specs
class MinIO(BaseComponent): #pylint: disable=too-few-public-methods
"""Minio Component Class."""
def __init__(
self,
source: str,
bucket_name: str,
destination: str,
endpoint: str,
):
"""Initializes the component class.
Args:
source : the source path of artifacts.
bucket_name : minio bucket name.
destination : the destination path in minio
endpoint : minio endpoint url.
"""
super(BaseComponent, self).__init__() #pylint: disable=bad-super-call
input_dict = {
standard_component_specs.MINIO_SOURCE: source,
standard_component_specs.MINIO_BUCKET_NAME: bucket_name,
standard_component_specs.MINIO_DESTINATION: destination,
}
output_dict = {}
exec_properties = {
standard_component_specs.MINIO_ENDPOINT: endpoint,
}
spec = standard_component_specs.MinIoSpec()
self._validate_spec(
spec=spec,
input_dict=input_dict,
output_dict=output_dict,
exec_properties=exec_properties,
)
Executor().Do(
input_dict=input_dict,
output_dict=output_dict,
exec_properties=exec_properties,
)
self.output_dict = output_dict

View File

@ -0,0 +1,187 @@
#!/usr/bin/env/python3
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Minio Executor Module."""
import os
from pytorch_kfp_components.components.base.base_executor import BaseExecutor
from pytorch_kfp_components.types import standard_component_specs
import urllib3
from minio import Minio #pylint: disable=no-name-in-module
class Executor(BaseExecutor):
"""Minio Executor Class."""
def __init__(self): #pylint: disable=useless-super-delegation
super(Executor, self).__init__() #pylint: disable=super-with-arguments
def _initiate_minio_client(self, minio_config: dict): #pylint: disable=no-self-use
"""Initializes the minio client.
Args:
minio_config : a dict for minio configuration.
Returns:
client : the minio server client
"""
minio_host = minio_config["HOST"]
access_key = minio_config["ACCESS_KEY"]
secret_key = minio_config["SECRET_KEY"]
client = Minio(
minio_host,
access_key=access_key,
secret_key=secret_key,
secure=False,
)
return client
def _read_minio_creds(self, endpoint: str): #pylint: disable=no-self-use
"""Reads the minio credentials.
Args:
endpoint : minio endpoint url
Raises:
ValueError : if minio access key and secret keys are missing
Returns:
minio_config : a dict for minio configuration.
"""
if "MINIO_ACCESS_KEY" not in os.environ:
raise ValueError("Environment variable MINIO_ACCESS_KEY not found")
if "MINIO_SECRET_KEY" not in os.environ:
raise ValueError("Environment variable MINIO_SECRET_KEY not found")
minio_config = {
"HOST": endpoint,
"ACCESS_KEY": os.environ["MINIO_ACCESS_KEY"],
"SECRET_KEY": os.environ["MINIO_SECRET_KEY"],
}
return minio_config
def upload_artifacts_to_minio( #pylint: disable=no-self-use,too-many-arguments
self,
client: Minio,
source: str,
destination: str,
bucket_name: str,
output_dict: dict,
):
"""Uploads artifacts to minio server.
Args:
client : Minio client
source : source path of artifacts.
destination : destination path of artifacts
bucket_name : minio bucket name.
output_dict : dict of output containing destination paths,
source and bucket names
Raises:
Exception : on MaxRetryError, NewConnectionError,
ConnectionError.
Returns:
output_dict : dict of output containing destination paths,
source and bucket names
"""
print(f"source {source} destination {destination}")
try:
client.fput_object(
bucket_name=bucket_name,
file_path=source,
object_name=destination,
)
output_dict[destination] = {
"bucket_name": bucket_name,
"source": source,
}
except (
urllib3.exceptions.MaxRetryError,
urllib3.exceptions.NewConnectionError,
urllib3.exceptions.ConnectionError,
RuntimeError,
) as expection_raised:
print(str(expection_raised))
raise Exception(expection_raised) #pylint: disable=raise-missing-from
return output_dict
def get_fn_args(self, input_dict: dict, exec_properties: dict): #pylint: disable=no-self-use
"""Extracts the source, bucket_name, folder_name from the input_dict
and endpoint from exec_properties.
Args:
input_dict : a dict of inputs having source,
destination etc.
exec_properties : a dict of execution properties,
having minio endpoint.
Returns:
source : source path of artifacts.
bucket_name : name of minio bucket
folder_name : name of folder in which artifacts are uploaded.
endpoint : minio endpoint url.
"""
source = input_dict.get(standard_component_specs.MINIO_SOURCE)
bucket_name = input_dict.get(
standard_component_specs.MINIO_BUCKET_NAME)
folder_name = input_dict.get(
standard_component_specs.MINIO_DESTINATION)
endpoint = exec_properties.get(standard_component_specs.MINIO_ENDPOINT)
return source, bucket_name, folder_name, endpoint
def Do(self, input_dict: dict, output_dict: dict, exec_properties: dict): #pylint: disable=too-many-locals
"""Executes the minio upload process.
Args:
input_dict : a dict of inputs having source, destination etc.
output_dict : dict of output containing destination paths,
source and bucket names
exec_properties : a dict of execution properties,
having minio endpoint.
Raises:
ValueError : for invalid/unknonwn source path
"""
source, bucket_name, folder_name, endpoint = self.get_fn_args(
input_dict=input_dict, exec_properties=exec_properties)
minio_config = self._read_minio_creds(endpoint=endpoint)
client = self._initiate_minio_client(minio_config=minio_config)
if not os.path.exists(source):
raise ValueError("Input path - {} does not exists".format(source))
if os.path.isfile(source):
artifact_name = source.split("/")[-1]
destination = os.path.join(folder_name, artifact_name)
self.upload_artifacts_to_minio(
client=client,
source=source,
destination=destination,
bucket_name=bucket_name,
output_dict=output_dict,
)
elif os.path.isdir(source):
for root, dirs, files in os.walk(source): #pylint: disable=unused-variable
for file in files:
source = os.path.join(root, file)
artifact_name = source.split("/")[-1]
destination = os.path.join(folder_name, artifact_name)
self.upload_artifacts_to_minio(
client=client,
source=source,
destination=destination,
bucket_name=bucket_name,
output_dict=output_dict,
)
else:
raise ValueError("Unknown source: {} ".format(source))

View File

@ -37,6 +37,20 @@ MAR_CONFIG_CONFIG_PROPERTIES = "config_properties"
MAR_CONFIG_REQUIREMENTS_FILE = "requirements_file"
MAR_CONFIG_EXTRA_FILES = "extra_files"
VIZ_MLPIPELINE_UI_METADATA = "mlpipeline_ui_metadata"
VIZ_MLPIPELINE_METRICS = "mlpipeline_metrics"
VIZ_CONFUSION_MATRIX_DICT = "confusion_matrix_dict"
VIZ_TEST_ACCURACY = "test_accuracy"
VIZ_MARKDOWN = "markdown"
VIZ_MARKDOWN_DICT_SOURCE = "source"
VIZ_MARKDOWN_DICT_STORAGE = "storage"
VIZ_CONFUSION_MATRIX_ACTUALS = "actuals"
VIZ_CONFUSION_MATRIX_PREDS = "preds"
VIZ_CONFUSION_MATRIX_CLASSES = "classes"
VIZ_CONFUSION_MATRIX_URL = "url"
MINIO_SOURCE = "source"
MINIO_BUCKET_NAME = "bucket_name"
MINIO_DESTINATION = "destination"
@ -97,9 +111,39 @@ class MarGenerationSpec: # pylint: disable=R0903
}
class VisualizationSpec:
"""Visualization Specification class.
For validating the parameter 'type'
"""
INPUT_DICT = {
VIZ_CONFUSION_MATRIX_DICT: Parameters(type=dict, optional=True),
VIZ_TEST_ACCURACY: Parameters(type=float, optional=True),
VIZ_MARKDOWN: Parameters(type=dict, optional=True),
}
OUTPUT_DICT = {}
EXECUTION_PROPERTIES = {
VIZ_MLPIPELINE_UI_METADATA: Parameters(type=str, optional=True),
VIZ_MLPIPELINE_METRICS: Parameters(type=str, optional=True),
}
MARKDOWN_DICT = {
VIZ_MARKDOWN_DICT_STORAGE: Parameters(type=str, optional=False),
VIZ_MARKDOWN_DICT_SOURCE: Parameters(type=dict, optional=False),
}
CONFUSION_MATRIX_DICT = {
VIZ_CONFUSION_MATRIX_ACTUALS: Parameters(type=list, optional=False),
VIZ_CONFUSION_MATRIX_PREDS: Parameters(type=list, optional=False),
VIZ_CONFUSION_MATRIX_CLASSES: Parameters(type=list, optional=False),
VIZ_CONFUSION_MATRIX_URL: Parameters(type=str, optional=False),
}
class MinIoSpec:
"""Minio Specification class.
For validating the parameter 'type' .
"""MinIO Specification class.
For validating the parameter 'type'
"""
INPUT_DICT = {
MINIO_SOURCE: Parameters(type=str),

View File

@ -0,0 +1,132 @@
#!/usr/bin/env/python3
# Copyright (c) Facebook, Inc. and its affiliates.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Unit Tests for Minio Component."""
import tempfile
import os
import mock
import pytest
from pytorch_kfp_components.components.minio.component import MinIO
from pytorch_kfp_components.components.minio.executor import Executor
tmpdir = tempfile.mkdtemp()
with open(os.path.join(str(tmpdir), "dummy.txt"), "w") as fp:
fp.write("dummy")
#pylint: disable=redefined-outer-name
#pylint: disable=invalid-name
@pytest.fixture(scope="class")
def minio_inputs():
"""Sets the minio inputs.
Returns:
minio_inputs : dict of inputs for minio uploads.
"""
minio_input = {
"bucket_name": "dummy",
"source": f"{tmpdir}/dummy.txt",
"destination": "dummy.txt",
"endpoint": "localhost:9000",
}
return minio_input
def upload_to_minio(minio_inputs):
"""Uoloads the artifact to minio."""
MinIO(
source=minio_inputs["source"],
bucket_name=minio_inputs["bucket_name"],
destination=minio_inputs["destination"],
endpoint=minio_inputs["endpoint"],
)
@pytest.mark.parametrize(
"key",
["source", "bucket_name", "destination", "endpoint"],
)
def test_minio_variables_invalid_type(minio_inputs, key):
"""Testing for invalid variable types."""
minio_inputs[key] = ["test"]
expected_exception_msg = f"{key} must be of type <class 'str'>" \
f" but received as {type(minio_inputs[key])}"
with pytest.raises(TypeError, match=expected_exception_msg):
upload_to_minio(minio_inputs)
@pytest.mark.parametrize(
"key",
["source", "bucket_name", "destination", "endpoint"],
)
def test_minio_mandatory_param(minio_inputs, key):
"""Testing for invalid minio mandatory parameters."""
minio_inputs[key] = None
expected_exception_msg = (
f"{key} is not optional. Received value: {minio_inputs[key]}")
with pytest.raises(ValueError, match=expected_exception_msg):
upload_to_minio(minio_inputs)
def test_missing_access_key(minio_inputs):
"""Test upload if minio access key is missing."""
os.environ["MINIO_SECRET_KEY"] = "dummy"
expected_exception_msg = "Environment variable MINIO_ACCESS_KEY not found"
with pytest.raises(ValueError, match=expected_exception_msg):
upload_to_minio(minio_inputs)
os.environ.pop("MINIO_SECRET_KEY")
def test_missing_secret_key(minio_inputs):
"""Test upload if minio secret key is missing."""
os.environ["MINIO_ACCESS_KEY"] = "dummy"
expected_exception_msg = "Environment variable MINIO_SECRET_KEY not found"
with pytest.raises(ValueError, match=expected_exception_msg):
upload_to_minio(minio_inputs)
os.environ.pop("MINIO_ACCESS_KEY")
def test_unreachable_endpoint(minio_inputs):
"""Testing unreachable minio endpoint with invalid minio creds."""
os.environ["MINIO_ACCESS_KEY"] = "dummy"
os.environ["MINIO_SECRET_KEY"] = "dummy"
with pytest.raises(Exception, match="Max retries exceeded with url*"):
upload_to_minio(minio_inputs)
def test_invalid_file_path(minio_inputs):
"""Test invalid source file path."""
minio_inputs["source"] = "dummy"
expected_exception_msg = (
f"Input path - {minio_inputs['source']} does not exists")
with pytest.raises(ValueError, match=expected_exception_msg):
upload_to_minio(minio_inputs)
def test_minio_upload_file(minio_inputs):
"""Testing upload of files to minio."""
with mock.patch.object(Executor, "upload_artifacts_to_minio") as client:
client.return_value = []
upload_to_minio(minio_inputs)
client.asser_called_once()
def test_minio_upload_folder(minio_inputs):
"""Testing upload of folder to minio."""
minio_inputs["source"] = tmpdir
with mock.patch.object(Executor, "upload_artifacts_to_minio") as client:
client.return_value = []
upload_to_minio(minio_inputs)
client.asser_called_once()