notebooks/components/notebook-controller/controllers/notebook_controller.go

490 lines
15 KiB
Go

/*
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package controllers
import (
"context"
"fmt"
"os"
"github.com/go-logr/logr"
"github.com/kubeflow/kubeflow/components/notebook-controller/api/v1beta1"
"github.com/kubeflow/kubeflow/components/notebook-controller/pkg/culler"
"github.com/kubeflow/kubeflow/components/notebook-controller/pkg/metrics"
"github.com/kubeflow/kubeflow/components/notebook-controller/pkg/util"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
apierrs "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/source"
)
const DefaultContainerPort = 8888
const DefaultServingPort = 80
// The default fsGroup of PodSecurityContext.
// https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#podsecuritycontext-v1-core
const DefaultFSGroup = int64(100)
/*
We generally want to ignore (not requeue) NotFound errors, since we'll get a
reconciliation request once the object exists, and requeuing in the meantime
won't help.
*/
func ignoreNotFound(err error) error {
if apierrs.IsNotFound(err) {
return nil
}
return err
}
// NotebookReconciler reconciles a Notebook object
type NotebookReconciler struct {
client.Client
Log logr.Logger
Scheme *runtime.Scheme
Metrics *metrics.Metrics
}
// +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=kubeflow.org,resources=notebooks,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=kubeflow.org,resources=notebooks/status,verbs=get;update;patch
func (r *NotebookReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) {
ctx := context.Background()
log := r.Log.WithValues("notebook", req.NamespacedName)
instance := &v1beta1.Notebook{}
if err := r.Get(ctx, req.NamespacedName, instance); err != nil {
log.Error(err, "unable to fetch Notebook")
return ctrl.Result{}, ignoreNotFound(err)
}
// Reconcile StatefulSet
ss := generateStatefulSet(instance)
if err := ctrl.SetControllerReference(instance, ss, r.Scheme); err != nil {
return ctrl.Result{}, err
}
// Check if the StatefulSet already exists
foundStateful := &appsv1.StatefulSet{}
justCreated := false
err := r.Get(ctx, types.NamespacedName{Name: ss.Name, Namespace: ss.Namespace}, foundStateful)
if err != nil && apierrs.IsNotFound(err) {
log.Info("Creating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
r.Metrics.NotebookCreation.WithLabelValues(ss.Namespace).Inc()
err = r.Create(ctx, ss)
justCreated = true
if err != nil {
log.Error(err, "unable to create Statefulset")
r.Metrics.NotebookFailCreation.WithLabelValues(ss.Namespace).Inc()
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Statefulset")
return ctrl.Result{}, err
}
// Update the foundStateful object and write the result back if there are any changes
if !justCreated && util.CopyStatefulSetFields(ss, foundStateful) {
log.Info("Updating StatefulSet", "namespace", ss.Namespace, "name", ss.Name)
err = r.Update(ctx, foundStateful)
if err != nil {
log.Error(err, "unable to update Statefulset")
return ctrl.Result{}, err
}
}
// Reconcile service
service := generateService(instance)
if err := ctrl.SetControllerReference(instance, service, r.Scheme); err != nil {
return ctrl.Result{}, err
}
// Check if the Service already exists
foundService := &corev1.Service{}
justCreated = false
err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService)
if err != nil && apierrs.IsNotFound(err) {
log.Info("Creating Service", "namespace", service.Namespace, "name", service.Name)
err = r.Create(ctx, service)
justCreated = true
if err != nil {
log.Error(err, "unable to create Service")
return ctrl.Result{}, err
}
} else if err != nil {
log.Error(err, "error getting Statefulset")
return ctrl.Result{}, err
}
// Update the foundService object and write the result back if there are any changes
if !justCreated && util.CopyServiceFields(service, foundService) {
log.Info("Updating Service\n", "namespace", service.Namespace, "name", service.Name)
err = r.Update(ctx, foundService)
if err != nil {
log.Error(err, "unable to update Service")
return ctrl.Result{}, err
}
}
// Reconcile virtual service if we use ISTIO.
if os.Getenv("USE_ISTIO") == "true" {
err = r.reconcileVirtualService(instance)
if err != nil {
return ctrl.Result{}, err
}
}
// Update the readyReplicas if the status is changed
if foundStateful.Status.ReadyReplicas != instance.Status.ReadyReplicas {
log.Info("Updating Status", "namespace", instance.Namespace, "name", instance.Name)
instance.Status.ReadyReplicas = foundStateful.Status.ReadyReplicas
err = r.Status().Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
}
// Check the pod status
pod := &corev1.Pod{}
podFound := false
err = r.Get(ctx, types.NamespacedName{Name: ss.Name + "-0", Namespace: ss.Namespace}, pod)
if err != nil && apierrs.IsNotFound(err) {
// This should be reconciled by the StatefulSet
log.Info("Pod not found...")
} else if err != nil {
return ctrl.Result{}, err
} else {
// Got the pod
podFound = true
if len(pod.Status.ContainerStatuses) > 0 &&
pod.Status.ContainerStatuses[0].State != instance.Status.ContainerState {
log.Info("Updating container state: ", "namespace", instance.Namespace, "name", instance.Name)
cs := pod.Status.ContainerStatuses[0].State
instance.Status.ContainerState = cs
oldConditions := instance.Status.Conditions
newCondition := getNextCondition(cs)
// Append new condition
if len(oldConditions) == 0 || oldConditions[0].Type != newCondition.Type ||
oldConditions[0].Reason != newCondition.Reason ||
oldConditions[0].Message != newCondition.Message {
log.Info("Appending to conditions: ", "namespace", instance.Namespace, "name", instance.Name, "type", newCondition.Type, "reason", newCondition.Reason, "message", newCondition.Message)
instance.Status.Conditions = append([]v1beta1.NotebookCondition{newCondition}, oldConditions...)
}
err = r.Status().Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
}
}
// Check if the Notebook needs to be stopped
if podFound && culler.NotebookNeedsCulling(instance.ObjectMeta) {
log.Info(fmt.Sprintf(
"Notebook %s/%s needs culling. Setting annotations",
instance.Namespace, instance.Name))
// Set annotations to the Notebook
culler.SetStopAnnotation(&instance.ObjectMeta)
err = r.Update(ctx, instance)
if err != nil {
return ctrl.Result{}, err
}
} else if podFound && !culler.StopAnnotationIsSet(instance.ObjectMeta) {
// The Pod is either too fresh, or the idle time has passed and it has
// received traffic. In this case we will be periodically checking if
// it needs culling.
return ctrl.Result{RequeueAfter: culler.GetRequeueTime()}, nil
}
return ctrl.Result{}, nil
}
func getNextCondition(cs corev1.ContainerState) v1beta1.NotebookCondition {
var nbtype = ""
var nbreason = ""
var nbmsg = ""
if cs.Running != nil {
nbtype = "Running"
} else if cs.Waiting != nil {
nbtype = "Waiting"
nbreason = cs.Waiting.Reason
nbmsg = cs.Waiting.Message
} else {
nbtype = "Terminated"
nbreason = cs.Terminated.Reason
nbmsg = cs.Terminated.Reason
}
newCondition := v1beta1.NotebookCondition{
Type: nbtype,
LastProbeTime: metav1.Now(),
Reason: nbreason,
Message: nbmsg,
}
return newCondition
}
func generateStatefulSet(instance *v1beta1.Notebook) *appsv1.StatefulSet {
replicas := int32(1)
if culler.StopAnnotationIsSet(instance.ObjectMeta) {
replicas = 0
}
ss := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Name,
Namespace: instance.Namespace,
},
Spec: appsv1.StatefulSetSpec{
Replicas: &replicas,
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"statefulset": instance.Name,
},
},
Template: corev1.PodTemplateSpec{
ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{
"statefulset": instance.Name,
"notebook-name": instance.Name,
}},
Spec: instance.Spec.Template.Spec,
},
},
}
// copy all of the Notebook labels to the pod including poddefault related labels
l := &ss.Spec.Template.ObjectMeta.Labels
for k, v := range instance.ObjectMeta.Labels {
(*l)[k] = v
}
podSpec := &ss.Spec.Template.Spec
container := &podSpec.Containers[0]
if container.WorkingDir == "" {
container.WorkingDir = "/home/jovyan"
}
if container.Ports == nil {
container.Ports = []corev1.ContainerPort{
{
ContainerPort: DefaultContainerPort,
Name: "notebook-port",
Protocol: "TCP",
},
}
}
container.Env = append(container.Env, corev1.EnvVar{
Name: "NB_PREFIX",
Value: "/notebook/" + instance.Namespace + "/" + instance.Name,
})
if podSpec.SecurityContext == nil {
fsGroup := DefaultFSGroup
podSpec.SecurityContext = &corev1.PodSecurityContext{
FSGroup: &fsGroup,
}
}
return ss
}
func generateService(instance *v1beta1.Notebook) *corev1.Service {
// Define the desired Service object
port := DefaultContainerPort
containerPorts := instance.Spec.Template.Spec.Containers[0].Ports
if containerPorts != nil {
port = int(containerPorts[0].ContainerPort)
}
svc := &corev1.Service{
ObjectMeta: metav1.ObjectMeta{
Name: instance.Name,
Namespace: instance.Namespace,
},
Spec: corev1.ServiceSpec{
Type: "ClusterIP",
Selector: map[string]string{"statefulset": instance.Name},
Ports: []corev1.ServicePort{
{
// Make port name follow Istio pattern so it can be managed by istio rbac
Name: "http-" + instance.Name,
Port: DefaultServingPort,
TargetPort: intstr.FromInt(port),
Protocol: "TCP",
},
},
},
}
return svc
}
func virtualServiceName(kfName string, namespace string) string {
return fmt.Sprintf("notebook-%s-%s", namespace, kfName)
}
func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructured, error) {
name := instance.Name
namespace := instance.Namespace
prefix := fmt.Sprintf("/notebook/%s/%s", namespace, name)
rewrite := fmt.Sprintf("/notebook/%s/%s", namespace, name)
// TODO(gabrielwen): Make clusterDomain an option.
service := fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace)
vsvc := &unstructured.Unstructured{}
vsvc.SetAPIVersion("networking.istio.io/v1alpha3")
vsvc.SetKind("VirtualService")
vsvc.SetName(virtualServiceName(name, namespace))
vsvc.SetNamespace(namespace)
if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{"*"}, "spec", "hosts"); err != nil {
return nil, fmt.Errorf("Set .spec.hosts error: %v", err)
}
istioGateway := os.Getenv("ISTIO_GATEWAY")
if len(istioGateway) == 0 {
istioGateway = "kubeflow/kubeflow-gateway"
}
if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{istioGateway},
"spec", "gateways"); err != nil {
return nil, fmt.Errorf("Set .spec.gateways error: %v", err)
}
http := []interface{}{
map[string]interface{}{
"match": []interface{}{
map[string]interface{}{
"uri": map[string]interface{}{
"prefix": prefix,
},
},
},
"rewrite": map[string]interface{}{
"uri": rewrite,
},
"route": []interface{}{
map[string]interface{}{
"destination": map[string]interface{}{
"host": service,
"port": map[string]interface{}{
"number": int64(DefaultServingPort),
},
},
},
},
"timeout": "300s",
},
}
if err := unstructured.SetNestedSlice(vsvc.Object, http, "spec", "http"); err != nil {
return nil, fmt.Errorf("Set .spec.http error: %v", err)
}
return vsvc, nil
}
func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook) error {
log := r.Log.WithValues("notebook", instance.Namespace)
virtualService, err := generateVirtualService(instance)
if err := ctrl.SetControllerReference(instance, virtualService, r.Scheme); err != nil {
return err
}
// Check if the virtual service already exists.
foundVirtual := &unstructured.Unstructured{}
justCreated := false
foundVirtual.SetAPIVersion("networking.istio.io/v1alpha3")
foundVirtual.SetKind("VirtualService")
err = r.Get(context.TODO(), types.NamespacedName{Name: virtualServiceName(instance.Name,
instance.Namespace), Namespace: instance.Namespace}, foundVirtual)
if err != nil && apierrs.IsNotFound(err) {
log.Info("Creating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
err = r.Create(context.TODO(), virtualService)
justCreated = true
if err != nil {
return err
}
} else if err != nil {
return err
}
if !justCreated && util.CopyVirtualService(virtualService, foundVirtual) {
log.Info("Updating virtual service", "namespace", instance.Namespace, "name",
virtualServiceName(instance.Name, instance.Namespace))
err = r.Update(context.TODO(), foundVirtual)
if err != nil {
return err
}
}
return nil
}
func (r *NotebookReconciler) SetupWithManager(mgr ctrl.Manager) error {
builder := ctrl.NewControllerManagedBy(mgr).
For(&v1beta1.Notebook{}).
Owns(&appsv1.StatefulSet{}).
Owns(&corev1.Service{})
// watch Istio virtual service
if os.Getenv("USE_ISTIO") == "true" {
virtualService := &unstructured.Unstructured{}
virtualService.SetAPIVersion("networking.istio.io/v1alpha3")
virtualService.SetKind("VirtualService")
builder.Owns(virtualService)
}
// TODO(lunkai): After this is fixed:
// https://github.com/kubernetes-sigs/controller-runtime/issues/572
// We don't have to call Build to get the controller.
c, err := builder.Build(r)
if err != nil {
return err
}
// watch underlying pod
mapFn := handler.ToRequestsFunc(
func(a handler.MapObject) []ctrl.Request {
return []ctrl.Request{
{NamespacedName: types.NamespacedName{
Name: a.Meta.GetLabels()["notebook-name"],
Namespace: a.Meta.GetNamespace(),
}},
}
})
p := predicate.Funcs{
UpdateFunc: func(e event.UpdateEvent) bool {
if _, ok := e.MetaOld.GetLabels()["notebook-name"]; !ok {
return false
}
return e.ObjectOld != e.ObjectNew
},
CreateFunc: func(e event.CreateEvent) bool {
if _, ok := e.Meta.GetLabels()["notebook-name"]; !ok {
return false
}
return true
},
}
return c.Watch(
&source.Kind{Type: &corev1.Pod{}},
&handler.EnqueueRequestsFromMapFunc{
ToRequests: mapFn,
},
p)
}