feat(ws): add `podTemplatePod` to Workspace status, for backend logs (#210)

* feat(ws): add `podTemplatePod` to Workspace status, for backend logs

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>

* remove accidental diff

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>

---------

Signed-off-by: Mathew Wicks <5735406+thesuperzapper@users.noreply.github.com>
This commit is contained in:
Mathew Wicks 2025-02-14 14:05:37 -08:00 committed by GitHub
parent f22890d307
commit 6f147902d7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
5 changed files with 239 additions and 72 deletions

View File

@ -164,9 +164,12 @@ type WorkspaceStatus struct {
// +kubebuilder:default=false
PendingRestart bool `json:"pendingRestart"`
// information about the current podTemplate options
// information about the current podTemplate options (only set for WorkspaceKind of podTemplate kind)
PodTemplateOptions WorkspacePodOptionsStatus `json:"podTemplateOptions"`
// information about the Pod managed by this Workspace (only set for WorkspaceKind of podTemplate kind)
PodTemplatePod WorkspacePodStatus `json:"podTemplatePod"`
// the current state of the Workspace
// +kubebuilder:default="Unknown"
State WorkspaceState `json:"state"`
@ -221,6 +224,24 @@ type WorkspacePodOptionRedirectStep struct {
Target string `json:"target"`
}
type WorkspacePodStatus struct {
// the name of the Pod resource
Name string `json:"name"`
// information about the Pod's containers
// +kubebuilder:validation:Optional
Containers []WorkspacePodContainer `json:"containers,omitempty"`
// information about the Pod's initContainers
// +kubebuilder:validation:Optional
InitContainers []WorkspacePodContainer `json:"initContainers,omitempty"`
}
type WorkspacePodContainer struct {
// the name of the container
Name string `json:"name"`
}
// +kubebuilder:validation:Enum:={"Running","Terminating","Paused","Pending","Error","Unknown"}
type WorkspaceState string

View File

@ -907,6 +907,21 @@ func (in *WorkspaceList) DeepCopyObject() runtime.Object {
return nil
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkspacePodContainer) DeepCopyInto(out *WorkspacePodContainer) {
*out = *in
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkspacePodContainer.
func (in *WorkspacePodContainer) DeepCopy() *WorkspacePodContainer {
if in == nil {
return nil
}
out := new(WorkspacePodContainer)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkspacePodMetadata) DeepCopyInto(out *WorkspacePodMetadata) {
*out = *in
@ -1003,6 +1018,31 @@ func (in *WorkspacePodOptionsStatus) DeepCopy() *WorkspacePodOptionsStatus {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkspacePodStatus) DeepCopyInto(out *WorkspacePodStatus) {
*out = *in
if in.Containers != nil {
in, out := &in.Containers, &out.Containers
*out = make([]WorkspacePodContainer, len(*in))
copy(*out, *in)
}
if in.InitContainers != nil {
in, out := &in.InitContainers, &out.InitContainers
*out = make([]WorkspacePodContainer, len(*in))
copy(*out, *in)
}
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkspacePodStatus.
func (in *WorkspacePodStatus) DeepCopy() *WorkspacePodStatus {
if in == nil {
return nil
}
out := new(WorkspacePodStatus)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *WorkspacePodTemplate) DeepCopyInto(out *WorkspacePodTemplate) {
*out = *in
@ -1083,6 +1123,7 @@ func (in *WorkspaceStatus) DeepCopyInto(out *WorkspaceStatus) {
*out = *in
out.Activity = in.Activity
in.PodTemplateOptions.DeepCopyInto(&out.PodTemplateOptions)
in.PodTemplatePod.DeepCopyInto(&out.PodTemplatePod)
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new WorkspaceStatus.

View File

@ -208,7 +208,8 @@ spec:
like `podMetadata`, `probes`, `extraEnv`, or `containerSecurityContext`
type: boolean
podTemplateOptions:
description: information about the current podTemplate options
description: information about the current podTemplate options (only
set for WorkspaceKind of podTemplate kind)
properties:
imageConfig:
description: info about the current imageConfig option
@ -274,6 +275,38 @@ spec:
- imageConfig
- podConfig
type: object
podTemplatePod:
description: information about the Pod managed by this Workspace (only
set for WorkspaceKind of podTemplate kind)
properties:
containers:
description: information about the Pod's containers
items:
properties:
name:
description: the name of the container
type: string
required:
- name
type: object
type: array
initContainers:
description: information about the Pod's initContainers
items:
properties:
name:
description: the name of the container
type: string
required:
- name
type: object
type: array
name:
description: the name of the Pod resource
type: string
required:
- name
type: object
state:
default: Unknown
description: the current state of the Workspace
@ -296,6 +329,7 @@ spec:
- pauseTime
- pendingRestart
- podTemplateOptions
- podTemplatePod
- state
- stateMessage
type: object

View File

@ -19,21 +19,20 @@ package controller
import (
"context"
"fmt"
"reflect"
"strings"
"k8s.io/apimachinery/pkg/util/intstr"
"github.com/go-logr/logr"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/utils/ptr"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
@ -52,6 +51,9 @@ const (
workspaceNameLabel = "notebooks.kubeflow.org/workspace-name"
workspaceSelectorLabel = "statefulset"
// pod template constants
workspacePodTemplateContainerName = "main"
// lengths for resource names
generateNameSuffixLength = 6
maxServiceNameLength = 63
@ -118,6 +120,11 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
return ctrl.Result{}, nil
}
// copy the current Workspace status, so we can avoid unnecessary updates if the status hasn't changed
// NOTE: we dereference the DeepCopy of the status field because status fields are NOT pointers,
// so otherwise the `equality.Semantic.DeepEqual` will always return false.
currentStatus := *workspace.Status.DeepCopy()
// fetch the WorkspaceKind
workspaceKindName := workspace.Spec.Kind
log = log.WithValues("workspaceKind", workspaceKindName)
@ -150,6 +157,10 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}
// a restart pending means at least one current config is different from its desired config
// NOTE: we initialize this to false and only set it to true if we find a difference
workspace.Status.PendingRestart = false
// get the current and desired (after redirects) imageConfig
currentImageConfig, desiredImageConfig, imageConfigRedirectChain, err := getImageConfig(workspace, workspaceKind)
if err != nil {
@ -192,10 +203,14 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
// will result in a forced restart of all Workspaces using the WorkspaceKind.
//
// if the Workspace is paused and a restart is pending, update the Workspace with the new options
if *workspace.Spec.Paused && workspace.Status.PendingRestart && !*workspace.Spec.DeferUpdates {
// if a restart is pending and the Workspace is paused, update the Workspace with the new options
if workspace.Status.PendingRestart && *workspace.Spec.Paused && !*workspace.Spec.DeferUpdates {
// update the Workspace with the new options
workspace.Spec.PodTemplate.Options.ImageConfig = workspace.Status.PodTemplateOptions.ImageConfig.Desired
workspace.Spec.PodTemplate.Options.PodConfig = workspace.Status.PodTemplateOptions.PodConfig.Desired
// update the Workspace
if err := r.Update(ctx, workspace); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating Workspace, will requeue")
@ -204,15 +219,9 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
log.Error(err, "unable to update Workspace")
return ctrl.Result{}, err
}
workspace.Status.PendingRestart = false
if err := r.Status().Update(ctx, workspace); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating Workspace status, will requeue")
return ctrl.Result{Requeue: true}, nil
}
log.Error(err, "unable to update Workspace status")
return ctrl.Result{}, err
}
// return and requeue to pick up the changes
return ctrl.Result{Requeue: true}, nil
}
// generate StatefulSet
@ -362,13 +371,15 @@ func (r *WorkspaceReconciler) Reconcile(ctx context.Context, req ctrl.Request) (
}
}
// update Workspace status
// populate the Workspace status
workspaceStatus, err := r.generateWorkspaceStatus(ctx, log, workspace, pod, statefulSet)
if err != nil {
return ctrl.Result{}, err
}
if !reflect.DeepEqual(workspace.Status, workspaceStatus) {
workspace.Status = workspaceStatus
workspace.Status = workspaceStatus
// update the Workspace status, if it has changed
if !equality.Semantic.DeepEqual(currentStatus, workspace.Status) {
if err := r.Status().Update(ctx, workspace); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating Workspace status, will requeue")
@ -796,7 +807,7 @@ func generateStatefulSet(workspace *kubefloworgv1beta1.Workspace, workspaceKind
Affinity: podConfigSpec.Affinity,
Containers: []corev1.Container{
{
Name: "main",
Name: workspacePodTemplateContainerName,
Image: imageConfigSpec.Image,
ImagePullPolicy: imagePullPolicy,
Ports: containerPorts,
@ -870,11 +881,14 @@ func generateService(workspace *kubefloworgv1beta1.Workspace, imageConfigSpec ku
// generateWorkspaceStatus generates a WorkspaceStatus for a Workspace
func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log logr.Logger, workspace *kubefloworgv1beta1.Workspace, pod *corev1.Pod, statefulSet *appsv1.StatefulSet) (kubefloworgv1beta1.WorkspaceStatus, error) {
// NOTE: some fields are populated before this function is called,
// including `status.pendingRestart` and `status.podTemplateOptions`
status := workspace.Status
// if workspace is paused, update the `status.pauseTime`
// NOTE: when the workspace is not paused, the pauseTime should be 0
if *workspace.Spec.Paused {
workspacePaused := ptr.Deref(workspace.Spec.Paused, false)
if workspacePaused {
if status.PauseTime == 0 {
status.PauseTime = metav1.Now().Unix()
}
@ -884,13 +898,65 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
}
}
// populate the pod information
status.PodTemplatePod = generateWorkspacePodStatus(pod)
// populate the workspace state and state message
workspaceState, workspaceStateMessage, err := r.generateWorkspaceState(ctx, log, workspacePaused, statefulSet, pod)
if err != nil {
return status, err
}
status.State = workspaceState
status.StateMessage = workspaceStateMessage
return status, nil
}
// generateWorkspacePodStatus generates a WorkspacePodStatus for a Pod
func generateWorkspacePodStatus(pod *corev1.Pod) kubefloworgv1beta1.WorkspacePodStatus {
podStatus := kubefloworgv1beta1.WorkspacePodStatus{}
// return an empty status if the Pod is nil
if pod == nil {
return podStatus
}
// populate the name
podStatus.Name = pod.Name
// populate the containers
containers := make([]kubefloworgv1beta1.WorkspacePodContainer, len(pod.Spec.Containers))
for i, container := range pod.Spec.Containers {
containers[i] = kubefloworgv1beta1.WorkspacePodContainer{
Name: container.Name,
}
}
podStatus.Containers = containers
// populate the initContainers
initContainers := make([]kubefloworgv1beta1.WorkspacePodContainer, len(pod.Spec.InitContainers))
for i, container := range pod.Spec.InitContainers {
initContainers[i] = kubefloworgv1beta1.WorkspacePodContainer{
Name: container.Name,
}
}
podStatus.InitContainers = initContainers
return podStatus
}
// generateWorkspaceState gets current state and stateMessage for a Workspace
func (r *WorkspaceReconciler) generateWorkspaceState(ctx context.Context, log logr.Logger, paused bool, statefulSet *appsv1.StatefulSet, pod *corev1.Pod) (kubefloworgv1beta1.WorkspaceState, string, error) {
state := kubefloworgv1beta1.WorkspaceStateUnknown
stateMessage := stateMsgUnknown
// cases where the Pod does not exist
if pod == nil {
// STATUS: Paused
if *workspace.Spec.Paused {
status.State = kubefloworgv1beta1.WorkspaceStatePaused
status.StateMessage = stateMsgPaused
return status, nil
if paused {
state = kubefloworgv1beta1.WorkspaceStatePaused
stateMessage = stateMsgPaused
return state, stateMessage, nil
}
// there might be StatefulSet events
@ -901,7 +967,7 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
}
if err := r.List(ctx, statefulSetEvents, listOpts); err != nil {
log.Error(err, "unable to list StatefulSet events")
return status, err
return state, stateMessage, err
}
// find the last StatefulSet warning event
@ -921,9 +987,9 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
// STATUS: Error (StatefulSet warning event)
if lastStsWarningEvent != nil {
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = fmt.Sprintf(stateMsgErrorStatefulSetWarningEvent, lastStsWarningEvent.Message)
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = fmt.Sprintf(stateMsgErrorStatefulSetWarningEvent, lastStsWarningEvent.Message)
return state, stateMessage, nil
}
}
@ -931,9 +997,9 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
if pod != nil {
// STATUS: Terminating
if pod.GetDeletionTimestamp() != nil {
status.State = kubefloworgv1beta1.WorkspaceStateTerminating
status.StateMessage = stateMsgTerminating
return status, nil
state = kubefloworgv1beta1.WorkspaceStateTerminating
stateMessage = stateMsgTerminating
return state, stateMessage, nil
}
// get the pod phase
@ -961,32 +1027,32 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
if !podScheduled {
switch podScheduledCondition.Reason {
case corev1.PodReasonUnschedulable:
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = fmt.Sprintf(stateMsgErrorPodUnschedulable, podScheduledCondition.Message)
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = fmt.Sprintf(stateMsgErrorPodUnschedulable, podScheduledCondition.Message)
return state, stateMessage, nil
case corev1.PodReasonSchedulingGated:
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = fmt.Sprintf(stateMsgErrorPodSchedulingGate, podScheduledCondition.Message)
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = fmt.Sprintf(stateMsgErrorPodSchedulingGate, podScheduledCondition.Message)
return state, stateMessage, nil
case corev1.PodReasonSchedulerError:
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = fmt.Sprintf(stateMsgErrorPodSchedulerError, podScheduledCondition.Message)
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = fmt.Sprintf(stateMsgErrorPodSchedulerError, podScheduledCondition.Message)
return state, stateMessage, nil
}
}
// STATUS: Running
if podPhase == corev1.PodRunning && podReady {
status.State = kubefloworgv1beta1.WorkspaceStateRunning
status.StateMessage = stateMsgRunning
return status, nil
state = kubefloworgv1beta1.WorkspaceStateRunning
stateMessage = stateMsgRunning
return state, stateMessage, nil
}
// get container status
// https://kubernetes.io/docs/concepts/workloads/pods/pod-lifecycle/#container-states
var containerStatus corev1.ContainerStatus
for _, container := range pod.Status.ContainerStatuses {
if container.Name == "main" {
if container.Name == workspacePodTemplateContainerName {
containerStatus = container
break
}
@ -998,27 +1064,24 @@ func (r *WorkspaceReconciler) generateWorkspaceStatus(ctx context.Context, log l
// STATUS: Error (container state)
if containerState.Waiting != nil {
if containerState.Waiting.Reason == "CrashLoopBackOff" {
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = stateMsgErrorContainerCrashLoopBackOff
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = stateMsgErrorContainerCrashLoopBackOff
return state, stateMessage, nil
}
if containerState.Waiting.Reason == "ImagePullBackOff" {
status.State = kubefloworgv1beta1.WorkspaceStateError
status.StateMessage = stateMsgErrorContainerImagePullBackOff
return status, nil
state = kubefloworgv1beta1.WorkspaceStateError
stateMessage = stateMsgErrorContainerImagePullBackOff
return state, stateMessage, nil
}
}
// STATUS: Pending
if podPhase == corev1.PodPending {
status.State = kubefloworgv1beta1.WorkspaceStatePending
status.StateMessage = stateMsgPending
return status, nil
state = kubefloworgv1beta1.WorkspaceStatePending
stateMessage = stateMsgPending
return state, stateMessage, nil
}
}
// STATUS: Unknown
status.State = kubefloworgv1beta1.WorkspaceStateUnknown
status.StateMessage = stateMsgUnknown
return status, nil
return state, stateMessage, nil
}

View File

@ -19,21 +19,20 @@ package controller
import (
"context"
"k8s.io/apimachinery/pkg/api/equality"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/log"
kubefloworgv1beta1 "github.com/kubeflow/notebooks/workspaces/controller/api/v1beta1"
"github.com/kubeflow/notebooks/workspaces/controller/internal/helper"
)
@ -71,6 +70,11 @@ func (r *WorkspaceKindReconciler) Reconcile(ctx context.Context, req ctrl.Reques
return ctrl.Result{}, err
}
// copy the current WorkspaceKind status, so we can avoid unnecessary updates if the status hasn't changed
// NOTE: we dereference the DeepCopy of the status field because status fields are NOT pointers,
// so otherwise the `equality.Semantic.DeepEqual` will always return false.
currentStatus := *workspaceKind.Status.DeepCopy()
// fetch all Workspaces that are using this WorkspaceKind
workspaces := &kubefloworgv1beta1.WorkspaceList{}
listOpts := &client.ListOptions{
@ -128,17 +132,21 @@ func (r *WorkspaceKindReconciler) Reconcile(ctx context.Context, req ctrl.Reques
}
}
// update the WorkspaceKind status
// populate the WorkspaceKind status
workspaceKind.Status.Workspaces = int32(numWorkspace) //nolint:gosec
workspaceKind.Status.PodTemplateOptions.ImageConfig = imageConfigMetrics
workspaceKind.Status.PodTemplateOptions.PodConfig = podConfigMetrics
if err := r.Status().Update(ctx, workspaceKind); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating WorkspaceKind status, will requeue")
return ctrl.Result{Requeue: true}, nil
// update the WorkspaceKind status, if it has changed
if !equality.Semantic.DeepEqual(currentStatus, workspaceKind.Status) {
if err := r.Status().Update(ctx, workspaceKind); err != nil {
if apierrors.IsConflict(err) {
log.V(2).Info("update conflict while updating WorkspaceKind status, will requeue")
return ctrl.Result{Requeue: true}, nil
}
log.Error(err, "unable to update WorkspaceKind status")
return ctrl.Result{}, err
}
log.Error(err, "unable to update WorkspaceKind status")
return ctrl.Result{}, err
}
return ctrl.Result{}, nil