mirror of https://github.com/kubeflow/examples.git
173 lines
5.9 KiB
Python
173 lines
5.9 KiB
Python
"""Some utilities for working with Kubernetes.
|
|
|
|
TODO: These should probably be replaced by functions in fairing.
|
|
"""
|
|
import logging
|
|
import re
|
|
import yaml
|
|
|
|
from kubernetes import client as k8s_client
|
|
from kubernetes.client import rest as k8s_rest
|
|
|
|
def camel_to_snake(name):
|
|
name = re.sub('(.)([A-Z][a-z]+)', r'\1_\2', name)
|
|
return re.sub('([a-z0-9])([A-Z])', r'\1_\2', name).lower()
|
|
|
|
K8S_CREATE = "K8S_CREATE"
|
|
K8S_REPLACE = "K8S_REPLACE"
|
|
K8S_CREATE_OR_REPLACE = "K8S_CREATE_OR_REPLACE"
|
|
|
|
def _get_result_name(result):
|
|
# For custom objects the result is a dict but for other objects
|
|
# its a python class
|
|
if isinstance(result, dict):
|
|
result_name = result["metadata"]["name"]
|
|
result_namespace = result["metadata"]["name"]
|
|
else:
|
|
result_name = result.metadata.name
|
|
result_namespace = result.metadata.namespace
|
|
|
|
return result_namespace, result_name
|
|
|
|
def apply_k8s_specs(specs, mode=K8S_CREATE): # pylint: disable=too-many-branches,too-many-statements
|
|
"""Run apply on the provided Kubernetes specs.
|
|
|
|
Args:
|
|
specs: A list of strings or dicts providing the YAML specs to
|
|
apply.
|
|
|
|
mode: (Optional): Mode indicates how the resources should be created.
|
|
K8S_CREATE - Use the create verb. Works with generateName
|
|
K8S_REPLACE - Issue a delete of existing resources before doing a create
|
|
K8s_CREATE_OR_REPLACE - Try to create an object; if it already exists
|
|
replace it
|
|
"""
|
|
# TODO(jlewi): How should we handle patching existing updates?
|
|
|
|
results = []
|
|
|
|
if mode not in [K8S_CREATE, K8S_CREATE_OR_REPLACE, K8S_REPLACE]:
|
|
raise ValueError(f"Unknown mode {mode}")
|
|
|
|
for s in specs:
|
|
spec = s
|
|
if not isinstance(spec, dict):
|
|
spec = yaml.load(spec)
|
|
|
|
name = spec["metadata"]["name"]
|
|
namespace = spec["metadata"]["namespace"]
|
|
kind = spec["kind"]
|
|
kind_snake = camel_to_snake(kind)
|
|
|
|
plural = spec["kind"].lower() + "s"
|
|
|
|
result = None
|
|
if not "/" in spec["apiVersion"]:
|
|
group = None
|
|
|
|
else:
|
|
group, version = spec["apiVersion"].split("/", 1)
|
|
|
|
if group is None or group.lower() == "apps":
|
|
if group is None:
|
|
api = k8s_client.CoreV1Api()
|
|
else:
|
|
api = k8s_client.AppsV1Api()
|
|
|
|
create_method_name = f"create_namespaced_{kind_snake}"
|
|
create_method_args = [namespace, spec]
|
|
|
|
replace_method_name = f"delete_namespaced_{kind_snake}"
|
|
replace_method_args = [name, namespace]
|
|
|
|
else:
|
|
api = k8s_client.CustomObjectsApi()
|
|
|
|
create_method_name = f"create_namespaced_custom_object"
|
|
create_method = getattr(api, create_method_name)
|
|
create_method_args = [group, version, namespace, plural, spec]
|
|
|
|
delete_options = k8s_client.V1DeleteOptions()
|
|
replace_method_name = f"delete_namespaced_custom_object"
|
|
replace_method_args = [group, version, namespace, plural, name, delete_options]
|
|
|
|
create_method = getattr(api, create_method_name)
|
|
replace_method = getattr(api, replace_method_name)
|
|
|
|
if mode in [K8S_CREATE, K8S_CREATE_OR_REPLACE]:
|
|
try:
|
|
result = create_method(*create_method_args)
|
|
result_namespace, result_name = _get_result_name(result)
|
|
logging.info(f"Created {kind} {result_namespace}.{result_name}")
|
|
results.append(result)
|
|
continue
|
|
except k8s_rest.ApiException as e:
|
|
# 409 is conflict indicates resource already exists
|
|
if e.status == 409 and mode == K8S_CREATE_OR_REPLACE:
|
|
pass
|
|
else:
|
|
raise
|
|
|
|
# Using replace didn't work for virtualservices so we explicitly delete
|
|
# and then issue a create
|
|
result = replace_method(*replace_method_args)
|
|
logging.info(f"Deleted {kind} {namespace}.{name}")
|
|
|
|
result = create_method(*create_method_args)
|
|
result_namespace, result_name = _get_result_name(result)
|
|
logging.info(f"Created {kind} {result_namespace}.{result_name}")
|
|
# Now recreate it
|
|
results.append(result)
|
|
|
|
return results
|
|
|
|
def get_iap_endpoint():
|
|
"""Return the URL of the IAP endpoint"""
|
|
extensions = k8s_client.ExtensionsV1beta1Api()
|
|
kf_ingress = None
|
|
|
|
try:
|
|
kf_ingress = extensions.read_namespaced_ingress("envoy-ingress", "istio-system")
|
|
except k8s_rest.ApiException as e:
|
|
if e.status == 403:
|
|
logging.warning(f"The service account doesn't have sufficient privileges "
|
|
f"to get the istio-system ingress. "
|
|
f"You will have to manually enter the Kubeflow endpoint. "
|
|
f"To make this function work ask someone with cluster "
|
|
f"priveleges to create an appropriate "
|
|
f"clusterrolebinding by running a command.\n"
|
|
f"kubectl create --namespace=istio-system rolebinding "
|
|
"--clusterrole=kubeflow-view "
|
|
"--serviceaccount=${NAMESPACE}:default-editor "
|
|
"${NAMESPACE}-istio-view")
|
|
return ""
|
|
|
|
raise
|
|
|
|
return f"https://{kf_ingress.spec.rules[0].host}"
|
|
|
|
## Use by AWS
|
|
def get_ingress_endpoint():
|
|
"""Return the URL of the Ingress endpoint"""
|
|
extensions = k8s_client.ExtensionsV1beta1Api()
|
|
kf_ingress = None
|
|
|
|
try:
|
|
kf_ingress = extensions.read_namespaced_ingress("istio-ingress", "istio-system")
|
|
except k8s_rest.ApiException as e:
|
|
if e.status == 403:
|
|
logging.warning(f"The service account doesn't have sufficient privileges "
|
|
f"to get the istio-system ingress. "
|
|
f"You will have to manually enter the Kubeflow endpoint. "
|
|
f"To make this function work ask someone with cluster "
|
|
f"priveleges to create an appropriate "
|
|
f"clusterrolebinding by running a command.\n"
|
|
f"kubectl create --namespace=istio-system rolebinding "
|
|
"--clusterrole=kubeflow-view "
|
|
"--serviceaccount=${NAMESPACE}:default-editor "
|
|
"${NAMESPACE}-istio-view")
|
|
return ""
|
|
|
|
raise
|
|
|
|
return f"http://{kf_ingress.status.load_balancer.ingress[0].hostname}" |