/* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. */ package controllers import ( "context" "fmt" "os" "github.com/go-logr/logr" "github.com/kubeflow/kubeflow/components/notebook-controller/api/v1beta1" "github.com/kubeflow/kubeflow/components/notebook-controller/pkg/culler" "github.com/kubeflow/kubeflow/components/notebook-controller/pkg/util" appsv1 "k8s.io/api/apps/v1" corev1 "k8s.io/api/core/v1" apierrs "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/source" ) const DefaultContainerPort = 8888 const DefaultServingPort = 80 // The default fsGroup of PodSecurityContext. // https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.11/#podsecuritycontext-v1-core const DefaultFSGroup = int64(100) /* We generally want to ignore (not requeue) NotFound errors, since we'll get a reconciliation request once the object exists, and requeuing in the meantime won't help. */ func ignoreNotFound(err error) error { if apierrs.IsNotFound(err) { return nil } return err } // NotebookReconciler reconciles a Notebook object type NotebookReconciler struct { client.Client Log logr.Logger Scheme *runtime.Scheme } // +kubebuilder:rbac:groups=apps,resources=statefulsets,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=apps,resources=statefulsets/status,verbs=get;update;patch // +kubebuilder:rbac:groups=core,resources=services,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=core,resources=services/status,verbs=get;update;patch // +kubebuilder:rbac:groups=kubeflow.org,resources=notebooks,verbs=get;list;watch;create;update;patch;delete // +kubebuilder:rbac:groups=kubeflow.org,resources=notebooks/status,verbs=get;update;patch func (r *NotebookReconciler) Reconcile(req ctrl.Request) (ctrl.Result, error) { ctx := context.Background() log := r.Log.WithValues("notebook", req.NamespacedName) instance := &v1beta1.Notebook{} if err := r.Get(ctx, req.NamespacedName, instance); err != nil { log.Error(err, "unable to fetch Notebook") return ctrl.Result{}, ignoreNotFound(err) } // Reconcile StatefulSet ss := generateStatefulSet(instance) if err := ctrl.SetControllerReference(instance, ss, r.Scheme); err != nil { return ctrl.Result{}, err } // Check if the StatefulSet already exists foundStateful := &appsv1.StatefulSet{} justCreated := false err := r.Get(ctx, types.NamespacedName{Name: ss.Name, Namespace: ss.Namespace}, foundStateful) if err != nil && apierrs.IsNotFound(err) { log.Info("Creating StatefulSet", "namespace", ss.Namespace, "name", ss.Name) err = r.Create(ctx, ss) justCreated = true if err != nil { log.Error(err, "unable to create Statefulset") return ctrl.Result{}, err } } else if err != nil { log.Error(err, "error getting Statefulset") return ctrl.Result{}, err } // Update the foundStateful object and write the result back if there are any changes if !justCreated && util.CopyStatefulSetFields(ss, foundStateful) { log.Info("Updating StatefulSet", "namespace", ss.Namespace, "name", ss.Name) err = r.Update(ctx, foundStateful) if err != nil { log.Error(err, "unable to update Statefulset") return ctrl.Result{}, err } } // Reconcile service service := generateService(instance) if err := ctrl.SetControllerReference(instance, service, r.Scheme); err != nil { return ctrl.Result{}, err } // Check if the Service already exists foundService := &corev1.Service{} justCreated = false err = r.Get(ctx, types.NamespacedName{Name: service.Name, Namespace: service.Namespace}, foundService) if err != nil && apierrs.IsNotFound(err) { log.Info("Creating Service", "namespace", service.Namespace, "name", service.Name) err = r.Create(ctx, service) justCreated = true if err != nil { log.Error(err, "unable to create Service") return ctrl.Result{}, err } } else if err != nil { log.Error(err, "error getting Statefulset") return ctrl.Result{}, err } // Update the foundService object and write the result back if there are any changes if !justCreated && util.CopyServiceFields(service, foundService) { log.Info("Updating Service\n", "namespace", service.Namespace, "name", service.Name) err = r.Update(ctx, foundService) if err != nil { log.Error(err, "unable to update Service") return ctrl.Result{}, err } } // Reconcile virtual service if we use ISTIO. if os.Getenv("USE_ISTIO") == "true" { err = r.reconcileVirtualService(instance) if err != nil { return ctrl.Result{}, err } } // Update the readyReplicas if the status is changed if foundStateful.Status.ReadyReplicas != instance.Status.ReadyReplicas { log.Info("Updating Status", "namespace", instance.Namespace, "name", instance.Name) instance.Status.ReadyReplicas = foundStateful.Status.ReadyReplicas err = r.Status().Update(ctx, instance) if err != nil { return ctrl.Result{}, err } } // Check the pod status pod := &corev1.Pod{} podFound := false err = r.Get(ctx, types.NamespacedName{Name: ss.Name + "-0", Namespace: ss.Namespace}, pod) if err != nil && apierrs.IsNotFound(err) { // This should be reconciled by the StatefulSet log.Info("Pod not found...") } else if err != nil { return ctrl.Result{}, err } else { // Got the pod podFound = true if len(pod.Status.ContainerStatuses) > 0 && pod.Status.ContainerStatuses[0].State != instance.Status.ContainerState { log.Info("Updating container state: ", "namespace", instance.Namespace, "name", instance.Name) cs := pod.Status.ContainerStatuses[0].State instance.Status.ContainerState = cs oldConditions := instance.Status.Conditions newCondition := getNextCondition(cs) // Append new condition if len(oldConditions) == 0 || oldConditions[0].Type != newCondition.Type || oldConditions[0].Reason != newCondition.Reason || oldConditions[0].Message != newCondition.Message { log.Info("Appending to conditions: ", "namespace", instance.Namespace, "name", instance.Name, "type", newCondition.Type, "reason", newCondition.Reason, "message", newCondition.Message) instance.Status.Conditions = append([]v1beta1.NotebookCondition{newCondition}, oldConditions...) } err = r.Status().Update(ctx, instance) if err != nil { return ctrl.Result{}, err } } } // Check if the Notebook needs to be stopped if podFound && culler.NotebookNeedsCulling(instance.ObjectMeta) { log.Info(fmt.Sprintf( "Notebook %s/%s needs culling. Setting annotations", instance.Namespace, instance.Name)) // Set annotations to the Notebook culler.SetStopAnnotation(&instance.ObjectMeta) err = r.Update(ctx, instance) if err != nil { return ctrl.Result{}, err } } else if podFound && !culler.StopAnnotationIsSet(instance.ObjectMeta) { // The Pod is either too fresh, or the idle time has passed and it has // received traffic. In this case we will be periodically checking if // it needs culling. return ctrl.Result{RequeueAfter: culler.GetRequeueTime()}, nil } return ctrl.Result{}, nil } func getNextCondition(cs corev1.ContainerState) v1beta1.NotebookCondition { var nbtype = "" var nbreason = "" var nbmsg = "" if cs.Running != nil { nbtype = "Running" } else if cs.Waiting != nil { nbtype = "Waiting" nbreason = cs.Waiting.Reason nbmsg = cs.Waiting.Message } else { nbtype = "Terminated" nbreason = cs.Terminated.Reason nbmsg = cs.Terminated.Reason } newCondition := v1beta1.NotebookCondition{ Type: nbtype, LastProbeTime: metav1.Now(), Reason: nbreason, Message: nbmsg, } return newCondition } func generateStatefulSet(instance *v1beta1.Notebook) *appsv1.StatefulSet { replicas := int32(1) if culler.StopAnnotationIsSet(instance.ObjectMeta) { replicas = 0 } ss := &appsv1.StatefulSet{ ObjectMeta: metav1.ObjectMeta{ Name: instance.Name, Namespace: instance.Namespace, }, Spec: appsv1.StatefulSetSpec{ Replicas: &replicas, Selector: &metav1.LabelSelector{ MatchLabels: map[string]string{ "statefulset": instance.Name, }, }, Template: corev1.PodTemplateSpec{ ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{ "statefulset": instance.Name, "notebook-name": instance.Name, }}, Spec: instance.Spec.Template.Spec, }, }, } // copy all of the Notebook labels to the pod including poddefault related labels l := &ss.Spec.Template.ObjectMeta.Labels for k, v := range instance.ObjectMeta.Labels { (*l)[k] = v } podSpec := &ss.Spec.Template.Spec container := &podSpec.Containers[0] if container.WorkingDir == "" { container.WorkingDir = "/home/jovyan" } if container.Ports == nil { container.Ports = []corev1.ContainerPort{ { ContainerPort: DefaultContainerPort, Name: "notebook-port", Protocol: "TCP", }, } } container.Env = append(container.Env, corev1.EnvVar{ Name: "NB_PREFIX", Value: "/notebook/" + instance.Namespace + "/" + instance.Name, }) if podSpec.SecurityContext == nil { fsGroup := DefaultFSGroup podSpec.SecurityContext = &corev1.PodSecurityContext{ FSGroup: &fsGroup, } } return ss } func generateService(instance *v1beta1.Notebook) *corev1.Service { // Define the desired Service object port := DefaultContainerPort containerPorts := instance.Spec.Template.Spec.Containers[0].Ports if containerPorts != nil { port = int(containerPorts[0].ContainerPort) } svc := &corev1.Service{ ObjectMeta: metav1.ObjectMeta{ Name: instance.Name, Namespace: instance.Namespace, }, Spec: corev1.ServiceSpec{ Type: "ClusterIP", Selector: map[string]string{"statefulset": instance.Name}, Ports: []corev1.ServicePort{ { // Make port name follow Istio pattern so it can be managed by istio rbac Name: "http-" + instance.Name, Port: DefaultServingPort, TargetPort: intstr.FromInt(port), Protocol: "TCP", }, }, }, } return svc } func virtualServiceName(kfName string, namespace string) string { return fmt.Sprintf("notebook-%s-%s", namespace, kfName) } func generateVirtualService(instance *v1beta1.Notebook) (*unstructured.Unstructured, error) { name := instance.Name namespace := instance.Namespace prefix := fmt.Sprintf("/notebook/%s/%s", namespace, name) rewrite := fmt.Sprintf("/notebook/%s/%s", namespace, name) // TODO(gabrielwen): Make clusterDomain an option. service := fmt.Sprintf("%s.%s.svc.cluster.local", name, namespace) vsvc := &unstructured.Unstructured{} vsvc.SetAPIVersion("networking.istio.io/v1alpha3") vsvc.SetKind("VirtualService") vsvc.SetName(virtualServiceName(name, namespace)) vsvc.SetNamespace(namespace) if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{"*"}, "spec", "hosts"); err != nil { return nil, fmt.Errorf("Set .spec.hosts error: %v", err) } istioGateway := os.Getenv("ISTIO_GATEWAY") if len(istioGateway) == 0 { istioGateway = "kubeflow/kubeflow-gateway" } if err := unstructured.SetNestedStringSlice(vsvc.Object, []string{istioGateway}, "spec", "gateways"); err != nil { return nil, fmt.Errorf("Set .spec.gateways error: %v", err) } http := []interface{}{ map[string]interface{}{ "match": []interface{}{ map[string]interface{}{ "uri": map[string]interface{}{ "prefix": prefix, }, }, }, "rewrite": map[string]interface{}{ "uri": rewrite, }, "route": []interface{}{ map[string]interface{}{ "destination": map[string]interface{}{ "host": service, "port": map[string]interface{}{ "number": int64(DefaultServingPort), }, }, }, }, "timeout": "300s", }, } if err := unstructured.SetNestedSlice(vsvc.Object, http, "spec", "http"); err != nil { return nil, fmt.Errorf("Set .spec.http error: %v", err) } return vsvc, nil } func (r *NotebookReconciler) reconcileVirtualService(instance *v1beta1.Notebook) error { log := r.Log.WithValues("notebook", instance.Namespace) virtualService, err := generateVirtualService(instance) if err := ctrl.SetControllerReference(instance, virtualService, r.Scheme); err != nil { return err } // Check if the virtual service already exists. foundVirtual := &unstructured.Unstructured{} justCreated := false foundVirtual.SetAPIVersion("networking.istio.io/v1alpha3") foundVirtual.SetKind("VirtualService") err = r.Get(context.TODO(), types.NamespacedName{Name: virtualServiceName(instance.Name, instance.Namespace), Namespace: instance.Namespace}, foundVirtual) if err != nil && apierrs.IsNotFound(err) { log.Info("Creating virtual service", "namespace", instance.Namespace, "name", virtualServiceName(instance.Name, instance.Namespace)) err = r.Create(context.TODO(), virtualService) justCreated = true if err != nil { return err } } else if err != nil { return err } if !justCreated && util.CopyVirtualService(virtualService, foundVirtual) { log.Info("Updating virtual service", "namespace", instance.Namespace, "name", virtualServiceName(instance.Name, instance.Namespace)) err = r.Update(context.TODO(), foundVirtual) if err != nil { return err } } return nil } func (r *NotebookReconciler) SetupWithManager(mgr ctrl.Manager) error { builder := ctrl.NewControllerManagedBy(mgr). For(&v1beta1.Notebook{}). Owns(&appsv1.StatefulSet{}). Owns(&corev1.Service{}) // watch Istio virtual service if os.Getenv("USE_ISTIO") == "true" { virtualService := &unstructured.Unstructured{} virtualService.SetAPIVersion("networking.istio.io/v1alpha3") virtualService.SetKind("VirtualService") builder.Owns(virtualService) } // TODO(lunkai): After this is fixed: // https://github.com/kubernetes-sigs/controller-runtime/issues/572 // We don't have to call Build to get the controller. c, err := builder.Build(r) if err != nil { return err } // watch underlying pod mapFn := handler.ToRequestsFunc( func(a handler.MapObject) []ctrl.Request { return []ctrl.Request{ {NamespacedName: types.NamespacedName{ Name: a.Meta.GetLabels()["notebook-name"], Namespace: a.Meta.GetNamespace(), }}, } }) p := predicate.Funcs{ UpdateFunc: func(e event.UpdateEvent) bool { if _, ok := e.MetaOld.GetLabels()["notebook-name"]; !ok { return false } return e.ObjectOld != e.ObjectNew }, CreateFunc: func(e event.CreateEvent) bool { if _, ok := e.Meta.GetLabels()["notebook-name"]; !ok { return false } return true }, } return c.Watch( &source.Kind{Type: &corev1.Pod{}}, &handler.EnqueueRequestsFromMapFunc{ ToRequests: mapFn, }, p) }