ContainerBuilder loading kube config (#1795)
* avoid istio injector in the container builder * find the correct namespace * configure default ns to kubeflow if out of cluster; fix unit tests * container build default gcs bucket * resolve comments * code refactor; add create_bucket_if_not_exist in containerbuilder * support load kube config and output error, good for ai platform notebooks/local notebooks * remove create_bucket_if_not_exist param
This commit is contained in:
parent
17e0efe51d
commit
243b88dbac
|
|
@ -18,32 +18,54 @@ import tempfile
|
|||
import os
|
||||
import uuid
|
||||
|
||||
SERVICEACCOUNT_NAMESPACE = "/var/run/secrets/kubernetes.io/serviceaccount/namespace"
|
||||
SERVICEACCOUNT_NAMESPACE = '/var/run/secrets/kubernetes.io/serviceaccount/namespace'
|
||||
GCS_STAGING_BLOB_DEFAULT_PREFIX = 'kfp_container_build_staging'
|
||||
|
||||
class ContainerBuilder(object):
|
||||
"""
|
||||
ContainerBuilder helps build a container image
|
||||
"""
|
||||
def __init__(self, gcs_staging, namespace=None):
|
||||
def __init__(self, gcs_staging=None, namespace=None):
|
||||
"""
|
||||
Args:
|
||||
gcs_staging (str): GCS blob that can store temporary build files
|
||||
gcs_staging (str): GCS bucket/blob that can store temporary build files,
|
||||
default is gs://PROJECT_ID/kfp_container_build_staging.
|
||||
namespace (str): kubernetes namespace where the pod is launched,
|
||||
default is the same namespace as the notebook service account in cluster
|
||||
or 'kubeflow' if not in cluster
|
||||
"""
|
||||
if not gcs_staging.startswith('gs://'):
|
||||
raise ValueError('Error: {} should be a GCS path.'.format(gcs_staging))
|
||||
self._gcs_staging = gcs_staging
|
||||
if gcs_staging is None:
|
||||
gcs_bucket = self._get_project_id()
|
||||
self._gcs_staging = 'gs://' + gcs_bucket + '/' + GCS_STAGING_BLOB_DEFAULT_PREFIX
|
||||
else:
|
||||
from pathlib import PurePath
|
||||
path = PurePath(gcs_staging).parts
|
||||
if len(path) < 2 or not path[0].startswith('gs'):
|
||||
raise ValueError('Error: {} should be a GCS path.'.format(gcs_staging))
|
||||
gcs_bucket = path[1]
|
||||
self._gcs_staging = gcs_staging
|
||||
from ._gcs_helper import GCSHelper
|
||||
GCSHelper.create_gcs_bucket_if_not_exist(gcs_bucket)
|
||||
|
||||
self._namespace = namespace
|
||||
if namespace is None:
|
||||
import os
|
||||
if os.path.exists(SERVICEACCOUNT_NAMESPACE):
|
||||
with open(SERVICEACCOUNT_NAMESPACE, 'r') as f:
|
||||
self._namespace = f.read()
|
||||
else:
|
||||
self._namespace = 'kubeflow'
|
||||
|
||||
def _get_project_id(self):
|
||||
import requests
|
||||
URL = "http://metadata.google.internal/computeMetadata/v1/project/project-id"
|
||||
headers = {
|
||||
'Metadata-Flavor': 'Google'
|
||||
}
|
||||
r = requests.get(url = URL, headers = headers)
|
||||
if not r.ok:
|
||||
raise RuntimeError('ContainerBuilder failed to retrieve the project id.')
|
||||
return r.text
|
||||
|
||||
def _generate_kaniko_spec(self, context, docker_filename, target_image):
|
||||
"""_generate_kaniko_yaml generates kaniko job yaml based on a template yaml """
|
||||
content = {
|
||||
|
|
|
|||
|
|
@ -19,6 +19,12 @@ class GCSHelper(object):
|
|||
|
||||
@staticmethod
|
||||
def get_blob_from_gcs_uri(gcs_path):
|
||||
"""
|
||||
Args:
|
||||
gcs_path (str) : gcs blob path
|
||||
Returns:
|
||||
gcs_blob: gcs blob object(https://github.com/googleapis/google-cloud-python/blob/5c9bb42cb3c9250131cfeef6e0bafe8f4b7c139f/storage/google/cloud/storage/blob.py#L105)
|
||||
"""
|
||||
from google.cloud import storage
|
||||
pure_path = PurePath(gcs_path)
|
||||
gcs_bucket = pure_path.parts[1]
|
||||
|
|
@ -30,15 +36,43 @@ class GCSHelper(object):
|
|||
|
||||
@staticmethod
|
||||
def upload_gcs_file(local_path, gcs_path):
|
||||
"""
|
||||
Args:
|
||||
local_path (str): local file path
|
||||
gcs_path (str) : gcs blob path
|
||||
"""
|
||||
blob = GCSHelper.get_blob_from_gcs_uri(gcs_path)
|
||||
blob.upload_from_filename(local_path)
|
||||
|
||||
@staticmethod
|
||||
def remove_gcs_blob(gcs_path):
|
||||
"""
|
||||
Args:
|
||||
gcs_path (str) : gcs blob path
|
||||
"""
|
||||
blob = GCSHelper.get_blob_from_gcs_uri(gcs_path)
|
||||
blob.delete()
|
||||
|
||||
@staticmethod
|
||||
def download_gcs_blob(local_path, gcs_path):
|
||||
"""
|
||||
Args:
|
||||
local_path (str): local file path
|
||||
gcs_path (str) : gcs blob path
|
||||
"""
|
||||
blob = GCSHelper.get_blob_from_gcs_uri(gcs_path)
|
||||
blob.download_to_filename(local_path)
|
||||
|
||||
@staticmethod
|
||||
def create_gcs_bucket_if_not_exist(gcs_bucket):
|
||||
"""
|
||||
Args:
|
||||
gcs_bucket (str) : gcs bucket name
|
||||
"""
|
||||
from google.cloud import storage
|
||||
from google.cloud.exceptions import Conflict
|
||||
client = storage.Client()
|
||||
try:
|
||||
client.create_bucket(gcs_bucket)
|
||||
except Conflict:
|
||||
pass
|
||||
|
|
@ -31,13 +31,16 @@ class K8sHelper(object):
|
|||
|
||||
def _configure_k8s(self):
|
||||
try:
|
||||
config.load_kube_config()
|
||||
logging.info('Found local kubernetes config. Initialized with kube_config.')
|
||||
except:
|
||||
logging.info('Cannot Find local kubernetes config. Trying in-cluster config.')
|
||||
config.load_incluster_config()
|
||||
logging.info('Initialized with in-cluster config.')
|
||||
|
||||
except:
|
||||
logging.info('Cannot find in-cluster config, trying the local kubernetes config. ')
|
||||
try:
|
||||
config.load_kube_config()
|
||||
logging.info('Found local kubernetes config. Initialized with kube_config.')
|
||||
except:
|
||||
raise RuntimeError('Forgot to run the gcloud command? Check out the link: \
|
||||
https://cloud.google.com/kubernetes-engine/docs/how-to/cluster-access-for-kubectl for more information')
|
||||
self._api_client = k8s_client.ApiClient()
|
||||
self._corev1 = k8s_client.CoreV1Api(self._api_client)
|
||||
return True
|
||||
|
|
|
|||
|
|
@ -17,13 +17,15 @@ import tarfile
|
|||
import unittest
|
||||
import yaml
|
||||
import tempfile
|
||||
import mock
|
||||
from kfp.compiler._component_builder import ContainerBuilder
|
||||
|
||||
GCS_BASE = 'gs://kfp-testing/'
|
||||
|
||||
@mock.patch('kfp.compiler._gcs_helper.GCSHelper')
|
||||
class TestContainerBuild(unittest.TestCase):
|
||||
|
||||
def test_wrap_dir_in_tarball(self):
|
||||
def test_wrap_dir_in_tarball(self, mock_gcshelper):
|
||||
""" Test wrap files in a tarball """
|
||||
|
||||
# prepare
|
||||
|
|
@ -48,7 +50,7 @@ class TestContainerBuild(unittest.TestCase):
|
|||
# clean up
|
||||
os.remove(temp_tarball)
|
||||
|
||||
def test_generate_kaniko_yaml(self):
|
||||
def test_generate_kaniko_yaml(self, mock_gcshelper):
|
||||
""" Test generating the kaniko job yaml """
|
||||
|
||||
# prepare
|
||||
|
|
|
|||
Loading…
Reference in New Issue