pipelines/components/aws/sagemaker/common/boto3_manager.py

187 lines
6.3 KiB
Python

"""Class for managing boto3 sessions."""
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from boto3.session import Session
from botocore.config import Config
from botocore.credentials import (
AssumeRoleCredentialFetcher,
CredentialResolver,
DeferredRefreshableCredentials,
JSONFileCache,
)
from botocore.session import Session as BotocoreSession
from sagemaker.session import Session as SageMakerSession
class Boto3Manager(object):
"""Provides static methods for boto3 clients."""
class AssumeRoleProvider(object):
"""AWS session provider that is capable of refreshing credentials using
assume role.
Taken from https://github.com/boto/botocore/issues/761#issuecomment-426037853 .
"""
METHOD = "assume-role"
def __init__(self, fetcher):
self._fetcher = fetcher
def load(self):
return DeferredRefreshableCredentials(
self._fetcher.fetch_credentials, self.METHOD
)
@staticmethod
def _get_boto3_session(
region: str, role_arn: str = None, assume_duration: int = 3600
) -> Session:
"""Creates a boto3 session, optionally assuming a role.
Args:
region: The AWS region for the session.
role_arn: The ARN to assume for the session.
assume_duration: The duration (in seconds) to assume the role.
Returns:
object: A boto3 Session.
"""
# By default return a basic session
if not role_arn:
return Session(region_name=region)
# The following assume role example was taken from
# https://github.com/boto/botocore/issues/761#issuecomment-426037853
# Create a session used to assume role
assume_session = BotocoreSession()
fetcher = AssumeRoleCredentialFetcher(
assume_session.create_client,
assume_session.get_credentials(),
role_arn,
extra_args={"DurationSeconds": assume_duration,},
cache=JSONFileCache(),
)
role_session = BotocoreSession()
role_session.register_component(
"credential_provider",
CredentialResolver([Boto3Manager.AssumeRoleProvider(fetcher)]),
)
return Session(region_name=region, botocore_session=role_session)
@staticmethod
def get_sagemaker_client(
component_version: str,
region: str,
endpoint_url: str = None,
assume_role_arn: str = None,
):
"""Builds a client to the AWS SageMaker API.
Args:
component_version: The version of the component to include in
the user agent.
region: The AWS region for the SageMaker client.
endpoint_url: A private link endpoint for SageMaker.
assume_role_arn: The ARN of a role for the boto3 client to assume.
Returns:
object: A SageMaker boto3 client.
"""
session = Boto3Manager._get_boto3_session(region, assume_role_arn)
session_config = Config(
user_agent=f"sagemaker-on-kubeflow-pipelines-v{component_version}",
retries={"max_attempts": 10, "mode": "standard"},
)
client = session.client(
"sagemaker",
region_name=region,
endpoint_url=endpoint_url,
config=session_config,
)
return client
@staticmethod
def get_sagemaker_session(
component_version: str,
region: str,
endpoint_url: str = None,
assume_role_arn: str = None,
):
"""Builds a SageMaker Session which can be used by any Estimator.
Args:
component_version: The version of the component to include in
the user agent.
region: The AWS region for the SageMaker client and SageMaker Session.
endpoint_url: A private link endpoint for SageMaker.
assume_role_arn: The ARN of a role for the boto3 client to assume.
Returns:
object: A SageMaker boto3 session.
"""
return SageMakerSession(
boto_session=Boto3Manager._get_boto3_session(region, assume_role_arn),
sagemaker_client=Boto3Manager.get_sagemaker_client(
component_version, region, endpoint_url, assume_role_arn
),
)
@staticmethod
def get_robomaker_client(
component_version: str,
region: str,
endpoint_url: str = None,
assume_role_arn: str = None,
):
"""Builds a client to the AWS RoboMaker API.
Args:
component_version: The version of the component to include in
the user agent.
region: The AWS region for the RoboMaker client.
endpoint_url: A private link endpoint for RoboMaker.
assume_role_arn: The ARN of a role for the boto3 client to assume.
Returns:
object: A RoboMaker boto3 client.
"""
session = Boto3Manager._get_boto3_session(region, assume_role_arn)
session_config = Config(
user_agent=f"sagemaker-on-kubeflow-pipelines-v{component_version}",
retries={"max_attempts": 10, "mode": "standard"},
)
client = session.client(
"robomaker",
region_name=region,
endpoint_url=endpoint_url,
config=session_config,
)
return client
@staticmethod
def get_cloudwatch_client(region: str, assume_role_arn: str = None):
"""Builds a client to the AWS CloudWatch API.
Args:
region: The AWS region for the CloudWatch client.
Returns:
object: A CloudWatch boto3 client.
"""
session = Boto3Manager._get_boto3_session(region, assume_role_arn)
client = session.client("logs")
return client