From 6fc3696bbdf983b69357df7f5e3d23e080986d0c Mon Sep 17 00:00:00 2001 From: Shingo Omura Date: Sat, 18 Jan 2025 02:59:41 +0900 Subject: [PATCH] Implement Suspend/Resume reconciliation in SparkApplication controller and its unit tests Signed-off-by: Shingo Omura --- .../controller/sparkapplication/controller.go | 212 ++++++++++- .../sparkapplication/controller_test.go | 339 ++++++++++++++++++ .../sparkapplication/event_filter.go | 11 +- pkg/common/event.go | 6 + 4 files changed, 554 insertions(+), 14 deletions(-) diff --git a/internal/controller/sparkapplication/controller.go b/internal/controller/sparkapplication/controller.go index 9c63a0d1..126f7898 100644 --- a/internal/controller/sparkapplication/controller.go +++ b/internal/controller/sparkapplication/controller.go @@ -130,6 +130,10 @@ func NewReconciler( // Reconcile handles Create, Update and Delete events of the custom resource. // State Machine for SparkApplication: +// NOTE: +// - Suspending can be transitioned from any state except for Terminated(Failed or Completed) and Suspended +// by setting Spec.Suspend=True (depicted in ** in below diagram) +// // +--------------------------------------------------------------------------------------------------------------------+ // | +---------------------------------------------------------------------------------------------+ | // | | +----------+ | | @@ -149,20 +153,28 @@ func NewReconciler( // | | | | | | | | | | | | | // | | | | | | | | | | | | | // | | | | | | | | | | | | | -// | +---------+ | +----^-----+ +-----+----+ +-----+-----+ +----------+ | -// | | | | | | -// | | | | | | -// | +------------+ | | +-------------------------------+ | -// | | | | +-----+-----+ | | +-----------+ +----------+ | -// | | | | | Pending | | | | | | | | +// | +---------+ | +----^--^--+ +-----+----+ +-----+-----+ +----------+ | +// | | | | | | | +// | | | +------+ | | | +// | +------------+ | | | +-------------------------------+ | +// | | | | +-----+-----+ | | | +-----------+ +----------+ | +// | | | | | Pending | | | | | | | | | // | | | +---+ Rerun <-------+ +---------------->Succeeding +---------->Completed | | // | |Invalidating| | <-------+ | | | | | -// | | +-------> | | | | | | | -// | | | | | | | | | | | -// | | | +-----------+ | +-----+-----+ +----------+ | -// | +------------+ | | | -// | | | | -// | +-------------------------------+ | +// | | +-------> | | | | | | | | +// | | | | | | | | | | | | +// | | | +-----------+ | | +-----+-----+ +----------+ | +// | +------------+ | | | | +// | | | | | +// | | +-------------------------------+ | +// | | | +// | +-----+----+ +-----------+ +------------+ +----+ | +// | | | | | | | | | | +// | | | | | | | | | | +// | | Resuming <----- + Suspended <------+ Suspending <---------+ ** | | +// | | | | | | | | | | +// | | | | | | | | | | +// | +----------+ +-----------+ +------------+ +----+ | // | | // +--------------------------------------------------------------------------------------------------------------------+ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { @@ -181,6 +193,15 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu if !app.DeletionTimestamp.IsZero() { return r.handleSparkApplicationDeletion(ctx, req) } + + if app.Spec.Suspend { + if !(util.IsTerminated(app) || + app.Status.AppState.State == v1beta2.ApplicationStateSuspended || + app.Status.AppState.State == v1beta2.ApplicationStateSuspending) { + return r.transitionToSuspending(ctx, req) + } + } + switch app.Status.AppState.State { case v1beta2.ApplicationStateNew: return r.reconcileNewSparkApplication(ctx, req) @@ -204,6 +225,12 @@ func (r *Reconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.Resu return r.reconcileFailedSparkApplication(ctx, req) case v1beta2.ApplicationStateUnknown: return r.reconcileUnknownSparkApplication(ctx, req) + case v1beta2.ApplicationStateSuspended: + return r.reconcileSuspendedSparkApplication(ctx, req) + case v1beta2.ApplicationStateSuspending: + return r.reconcileSuspendingSparkApplication(ctx, req) + case v1beta2.ApplicationStateResuming: + return r.reconcileResumingSparkApplication(ctx, req) } return ctrl.Result{}, nil } @@ -634,6 +661,138 @@ func (r *Reconciler) reconcileUnknownSparkApplication(ctx context.Context, req c return ctrl.Result{}, nil } +func (r *Reconciler) reconcileSuspendingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + key := req.NamespacedName + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + old, err := r.getSparkApplication(ctx, key) + if err != nil { + return err + } + if old.Status.AppState.State != v1beta2.ApplicationStateSuspending { + return nil + } + app := old.DeepCopy() + + r.recordSparkApplicationEvent(app) + + if err := r.deleteSparkResources(ctx, app); err != nil { + logger.Error(err, "failed to delete spark resources", "name", app.Name, "namespace", app.Namespace) + return err + } + + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateSuspended, + } + r.resetSparkApplicationStatus(app) + + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return err + } + return nil + }, + ) + if retryErr != nil { + logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) + return ctrl.Result{}, retryErr + } + return ctrl.Result{}, nil +} + +func (r *Reconciler) reconcileSuspendedSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + key := req.NamespacedName + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + old, err := r.getSparkApplication(ctx, key) + if err != nil { + return err + } + if old.Status.AppState.State != v1beta2.ApplicationStateSuspended { + return nil + } + app := old.DeepCopy() + + if r.validateSparkResourceDeletion(ctx, app) { + logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State) + r.resetSparkApplicationStatus(app) + r.recordSparkApplicationEvent(app) + if !app.Spec.Suspend { + app.Status.AppState = v1beta2.ApplicationState{ + State: v1beta2.ApplicationStateResuming, + } + } + } + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return err + } + return nil + }, + ) + if retryErr != nil { + logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) + return ctrl.Result{}, retryErr + } + return ctrl.Result{}, nil +} + +func (r *Reconciler) reconcileResumingSparkApplication(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + key := req.NamespacedName + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + old, err := r.getSparkApplication(ctx, key) + if err != nil { + return err + } + if old.Status.AppState.State != v1beta2.ApplicationStateResuming { + return nil + } + app := old.DeepCopy() + + r.recordSparkApplicationEvent(app) + + _ = r.submitSparkApplication(app) + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return err + } + return nil + }, + ) + if retryErr != nil { + logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) + return ctrl.Result{Requeue: true}, retryErr + } + return ctrl.Result{}, nil +} + +func (r *Reconciler) transitionToSuspending(ctx context.Context, req ctrl.Request) (ctrl.Result, error) { + key := req.NamespacedName + retryErr := retry.RetryOnConflict( + retry.DefaultRetry, + func() error { + old, err := r.getSparkApplication(ctx, key) + if err != nil { + return err + } + app := old.DeepCopy() + + app.Status.AppState.State = v1beta2.ApplicationStateSuspending + if err := r.updateSparkApplicationStatus(ctx, app); err != nil { + return err + } + + return nil + }, + ) + if retryErr != nil { + logger.Error(retryErr, "Failed to reconcile SparkApplication", "name", key.Name, "namespace", key.Namespace) + return ctrl.Result{Requeue: true}, retryErr + } + return ctrl.Result{}, nil +} + // getSparkApplication gets the SparkApplication with the given name and namespace. func (r *Reconciler) getSparkApplication(ctx context.Context, key types.NamespacedName) (*v1beta2.SparkApplication, error) { app := &v1beta2.SparkApplication{} @@ -1128,6 +1287,30 @@ func (r *Reconciler) recordSparkApplicationEvent(app *v1beta2.SparkApplication) "SparkApplication %s is pending rerun", app.Name, ) + case v1beta2.ApplicationStateSuspending: + r.recorder.Eventf( + app, + corev1.EventTypeWarning, + common.EventSparkApplicationSuspending, + "SparkApplication %s is suspending", + app.Name, + ) + case v1beta2.ApplicationStateSuspended: + r.recorder.Eventf( + app, + corev1.EventTypeWarning, + common.EventSparkApplicationSuspended, + "SparkApplication %s is suspended", + app.Name, + ) + case v1beta2.ApplicationStateResuming: + r.recorder.Eventf( + app, + corev1.EventTypeWarning, + common.EventSparkApplicationResuming, + "SparkApplication %s is resuming", + app.Name, + ) } } @@ -1179,6 +1362,11 @@ func (r *Reconciler) resetSparkApplicationStatus(app *v1beta2.SparkApplication) status.DriverInfo = v1beta2.DriverInfo{} status.AppState.ErrorMessage = "" status.ExecutorState = nil + case v1beta2.ApplicationStateSuspended: + status.SparkApplicationID = "" + status.AppState.ErrorMessage = "" + status.DriverInfo = v1beta2.DriverInfo{} + status.ExecutorState = nil } } diff --git a/internal/controller/sparkapplication/controller_test.go b/internal/controller/sparkapplication/controller_test.go index 704ad4ee..07d44ac1 100644 --- a/internal/controller/sparkapplication/controller_test.go +++ b/internal/controller/sparkapplication/controller_test.go @@ -29,6 +29,7 @@ import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/tools/record" + "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/reconcile" "github.com/kubeflow/spark-operator/api/v1beta2" @@ -550,6 +551,344 @@ var _ = Describe("SparkApplication Controller", func() { Expect(app.Status.ExecutorState).To(HaveLen(1)) }) }) + + Context("Suspend and Resume", func() { + ctx := context.Background() + appName := "test" + appNamespace := "default" + key := types.NamespacedName{ + Name: appName, + Namespace: appNamespace, + } + + var app *v1beta2.SparkApplication + BeforeEach(func() { + app = &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: appName, + Namespace: appNamespace, + }, + Spec: v1beta2.SparkApplicationSpec{ + MainApplicationFile: util.StringPtr("local:///dummy.jar"), + }, + } + v1beta2.SetSparkApplicationDefaults(app) + }) + + Context("Suspend", func() { + When("reconciling a new SparkApplication with Suspend=True", func() { + BeforeEach(func() { + app.Spec.Suspend = true + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should transition to Suspending state", func() { + By("Reconciling the new SparkApplication with Suspend=true") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateSuspending})) + }) + }) + When("reconciling a Non-Terminated(e.g. Running) SparkApplication with Suspend=true", func() { + BeforeEach(func() { + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + + driverPod := createDriverPod(appName, appNamespace) + Expect(k8sClient.Create(ctx, driverPod)).To(Succeed()) + driverPod.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, driverPod)).To(Succeed()) + + app.Status.DriverInfo.PodName = driverPod.Name + app.Status.AppState.State = v1beta2.ApplicationStateRunning + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + + executorPod1 := createExecutorPod(appName, appNamespace, 1) + Expect(k8sClient.Create(ctx, executorPod1)).To(Succeed()) + executorPod1.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, executorPod1)).To(Succeed()) + + executorPod2 := createExecutorPod(appName, appNamespace, 2) + Expect(k8sClient.Create(ctx, executorPod2)).To(Succeed()) + executorPod2.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, executorPod2)).To(Succeed()) + + app.Spec.Suspend = true + Expect(k8sClient.Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + + By("Deleting the driver pod") + driverPod := &corev1.Pod{} + Expect(k8sClient.Get(ctx, getDriverNamespacedName(appName, appNamespace), driverPod)).To(Succeed()) + Expect(k8sClient.Delete(ctx, driverPod)).To(Succeed()) + + By("Deleting the executor pods") + executorPod1 := &corev1.Pod{} + Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 1), executorPod1)).To(Succeed()) + Expect(k8sClient.Delete(ctx, executorPod1)).To(Succeed()) + executorPod2 := &corev1.Pod{} + Expect(k8sClient.Get(ctx, getExecutorNamespacedName(appName, appNamespace, 2), executorPod2)).To(Succeed()) + Expect(k8sClient.Delete(ctx, executorPod2)).To(Succeed()) + }) + It("should transition to Suspending state", func() { + By("Reconciling the Running SparkApplication with Suspend=true") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateSuspending})) + }) + }) + When("reconciling a Terminated(Failed or Completed) SparkApplication with Suspend=true", func() { + BeforeEach(func() { + app.Spec.Suspend = true + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + app.Status.AppState.State = v1beta2.ApplicationStateFailed + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should not change its application state", func() { + By("Reconciling the terminated SparkApplication with Suspend=true") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateFailed})) + }) + }) + When("reconciling Resuming SparkApplication with Suspend=true", func() { + BeforeEach(func() { + app.Spec.Suspend = true + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + app.Status.AppState.State = v1beta2.ApplicationStateResuming + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should transition to Suspending state", func() { + By("Reconciling the Resuming SparkApplication with Suspend=true") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateSuspending})) + }) + }) + When("reconciling a Suspending SparkApplication with Suspend=true", func() { + var driverPod *corev1.Pod + BeforeEach(func() { + app.Spec.Suspend = true + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + + driverPod = createDriverPod(appName, appNamespace) + Expect(k8sClient.Create(ctx, driverPod)).To(Succeed()) + driverPod.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, driverPod)).To(Succeed()) + + app.Status.DriverInfo.PodName = driverPod.Name + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + + app.Status.AppState.State = v1beta2.ApplicationStateSuspending + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should delete dependent spark resources and transition to Suspended app state", func() { + By("Reconciling the Suspending SparkApplication with Suspend=true") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(driverPod), &corev1.Pod{})).To(Satisfy(errors.IsNotFound)) + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateSuspended})) + }) + }) + }) + Context("Resume", func() { + When("reconciling Suspended SparkApplication with Suspend=false(resuming)", func() { + BeforeEach(func() { + app.Spec.Suspend = false + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + app.Status.AppState.State = v1beta2.ApplicationStateSuspended + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should transition to Resuming state", func() { + By("Reconciling the Suspended SparkApplication with Suspend=false") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateResuming})) + }) + }) + When("reconciling Suspending SparkApplication with Suspend=false(resuming)", func() { + var driverPod *corev1.Pod + BeforeEach(func() { + app.Spec.Suspend = false + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + + driverPod = createDriverPod(appName, appNamespace) + Expect(k8sClient.Create(ctx, driverPod)).To(Succeed()) + driverPod.Status.Phase = corev1.PodRunning + Expect(k8sClient.Status().Update(ctx, driverPod)).To(Succeed()) + + app.Status.DriverInfo.PodName = driverPod.Name + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + + app.Status.AppState.State = v1beta2.ApplicationStateSuspending + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should transition to Suspended -> Resuming state", func() { + By("Reconciling the Suspending SparkApplication with Suspend=false") + reconciler := sparkapplication.NewReconciler( + nil, + k8sClient.Scheme(), + k8sClient, + record.NewFakeRecorder(3), + nil, + sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10}, + ) + result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + + Expect(k8sClient.Get(ctx, client.ObjectKeyFromObject(driverPod), &corev1.Pod{})).To(Satisfy(errors.IsNotFound)) + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateSuspended})) + + By("Reconciling the Suspended SparkApplication with Suspend=false") + result, err = reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key}) + Expect(err).NotTo(HaveOccurred()) + Expect(result.Requeue).To(BeFalse()) + app = &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).NotTo(HaveOccurred()) + Expect(app.Status.AppState).To(BeEquivalentTo(v1beta2.ApplicationState{State: v1beta2.ApplicationStateResuming})) + }) + }) + When("reconciling Resuming SparkApplication with Suspend=false(resuming)", func() { + BeforeEach(func() { + app.Spec.Suspend = false + Expect(k8sClient.Create(ctx, app)).To(Succeed()) + app.Status.AppState.State = v1beta2.ApplicationStateResuming + Expect(k8sClient.Status().Update(ctx, app)).To(Succeed()) + }) + AfterEach(func() { + app := &v1beta2.SparkApplication{} + Expect(k8sClient.Get(ctx, key, app)).To(Succeed()) + + By("Deleting the created test SparkApplication") + Expect(k8sClient.Delete(ctx, app)).To(Succeed()) + }) + It("should submit pods and transition to Submitted", func() { + Skip("This transition can not test because there is no spark-submit") + }) + }) + }) + }) }) func getDriverNamespacedName(appName string, appNamespace string) types.NamespacedName { diff --git a/internal/controller/sparkapplication/event_filter.go b/internal/controller/sparkapplication/event_filter.go index 121155f2..0968aa6c 100644 --- a/internal/controller/sparkapplication/event_filter.go +++ b/internal/controller/sparkapplication/event_filter.go @@ -167,9 +167,16 @@ func (f *EventFilter) Update(e event.UpdateEvent) bool { return false } - // The spec has changed. This is currently best effort as we can potentially miss updates - // and end up in an inconsistent state. + // The spec has changed except for Spec.Suspend. + // This is currently best effort as we can potentially miss updates and end up in an inconsistent state. if !equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) { + + // Only Spec.Suspend can be updated + oldApp.Spec.Suspend = newApp.Spec.Suspend + if equality.Semantic.DeepEqual(oldApp.Spec, newApp.Spec) { + return true + } + // Force-set the application status to Invalidating which handles clean-up and application re-run. newApp.Status.AppState.State = v1beta2.ApplicationStateInvalidating logger.Info("Updating SparkApplication status", "name", newApp.Name, "namespace", newApp.Namespace, " oldState", oldApp.Status.AppState.State, "newState", newApp.Status.AppState.State) diff --git a/pkg/common/event.go b/pkg/common/event.go index 0c469ce9..00243c9b 100644 --- a/pkg/common/event.go +++ b/pkg/common/event.go @@ -29,6 +29,12 @@ const ( EventSparkApplicationFailed = "SparkApplicationFailed" EventSparkApplicationPendingRerun = "SparkApplicationPendingRerun" + + EventSparkApplicationSuspending = "SparkApplicationSuspending" + + EventSparkApplicationSuspended = "SparkApplicationSuspended" + + EventSparkApplicationResuming = "SparkApplicationResuming" ) // Spark driver events