525 lines
22 KiB
Go
525 lines
22 KiB
Go
/*
|
|
Copyright 2018 Google LLC
|
|
|
|
Licensed under the Apache License, Version 2.0 (the "License");
|
|
you may not use this file except in compliance with the License.
|
|
You may obtain a copy of the License at
|
|
|
|
https://www.apache.org/licenses/LICENSE-2.0
|
|
|
|
Unless required by applicable law or agreed to in writing, software
|
|
distributed under the License is distributed on an "AS IS" BASIS,
|
|
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
See the License for the specific language governing permissions and
|
|
limitations under the License.
|
|
*/
|
|
|
|
package scheduledsparkapplication
|
|
|
|
import (
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/stretchr/testify/assert"
|
|
|
|
apiextensionsfake "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset/fake"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/util/clock"
|
|
kubeclientfake "k8s.io/client-go/kubernetes/fake"
|
|
kubetesting "k8s.io/client-go/testing"
|
|
"k8s.io/client-go/tools/cache"
|
|
|
|
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
|
|
crdclientfake "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/clientset/versioned/fake"
|
|
crdinformers "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/client/informers/externalversions"
|
|
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
|
|
)
|
|
|
|
func TestSyncScheduledSparkApplication_Allow(t *testing.T) {
|
|
app := &v1beta2.ScheduledSparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "test-app-allow",
|
|
},
|
|
Spec: v1beta2.ScheduledSparkApplicationSpec{
|
|
Schedule: "@every 10m",
|
|
ConcurrencyPolicy: v1beta2.ConcurrencyAllow,
|
|
},
|
|
}
|
|
c, clk := newFakeController()
|
|
c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Create(app)
|
|
|
|
key, _ := cache.MetaNamespaceKeyFunc(app)
|
|
options := metav1.GetOptions{}
|
|
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
// The first run should not have been triggered.
|
|
assert.True(t, app.Status.LastRunName == "")
|
|
|
|
// Advance the clock by 10 minutes.
|
|
clk.Step(10 * time.Minute)
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
firstRunName := app.Status.LastRunName
|
|
// The first run should have been triggered.
|
|
assert.True(t, firstRunName != "")
|
|
assert.False(t, app.Status.LastRun.IsZero())
|
|
assert.True(t, app.Status.NextRun.Time.After(app.Status.LastRun.Time))
|
|
// The first run exists.
|
|
run, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
clk.Step(5 * time.Second)
|
|
// The second sync should not start any new run.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
// Next run is not due, so LastRunName should stay the same.
|
|
assert.Equal(t, firstRunName, app.Status.LastRunName)
|
|
|
|
// Simulate completion of the first run.
|
|
run.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run)
|
|
// This sync should not start any new run, but update Status.PastSuccessfulRunNames.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
|
|
assert.Equal(t, firstRunName, app.Status.PastSuccessfulRunNames[0])
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
// This sync should not start any new run, nor update Status.PastSuccessfulRunNames.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
|
|
assert.Equal(t, firstRunName, app.Status.PastSuccessfulRunNames[0])
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
// Advance the clock to trigger the second run.
|
|
clk.SetTime(app.Status.NextRun.Time.Add(5 * time.Second))
|
|
// This sync should start the second run.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
// The second run should have a different name.
|
|
secondRunName := app.Status.LastRunName
|
|
assert.NotEqual(t, firstRunName, secondRunName)
|
|
assert.True(t, app.Status.NextRun.Time.After(app.Status.LastRun.Time))
|
|
// The second run exists.
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(secondRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
// Simulate completion of the second run.
|
|
run.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run)
|
|
// This sync should not start any new run, but update Status.PastSuccessfulRunNames.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
|
|
// The first run should have been deleted due to the completion of the second run.
|
|
firstRun, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.Nil(t, firstRun)
|
|
|
|
// This sync should not start any new run, nor update Status.PastSuccessfulRunNames.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(secondRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
// Test the case where we update the schedule to be more frequent
|
|
app.Spec.Schedule = "@every 2m"
|
|
recentRunName := app.Status.LastRunName
|
|
recentRunTime := app.Status.LastRun.Time
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Update(app)
|
|
// sync our update
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
// Advance the clock by 3 minutes.
|
|
clk.Step(3 * time.Minute)
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
// A run should have been triggered
|
|
assert.NotEqual(t, recentRunName, app.Status.LastRunName)
|
|
assert.True(t, recentRunTime.Before(app.Status.LastRun.Time))
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(app.Status.LastRunName, options)
|
|
assert.NotNil(t, run)
|
|
// Simulate completion of the last run
|
|
run.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run)
|
|
// This sync should not start any new run, but update Status.PastSuccessfulRunNames.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
}
|
|
|
|
func TestSyncScheduledSparkApplication_Forbid(t *testing.T) {
|
|
app := &v1beta2.ScheduledSparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "test-app-forbid",
|
|
},
|
|
Spec: v1beta2.ScheduledSparkApplicationSpec{
|
|
Schedule: "@every 1m",
|
|
ConcurrencyPolicy: v1beta2.ConcurrencyForbid,
|
|
},
|
|
}
|
|
c, clk := newFakeController()
|
|
c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Create(app)
|
|
|
|
key, _ := cache.MetaNamespaceKeyFunc(app)
|
|
options := metav1.GetOptions{}
|
|
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
// The first run should not have been triggered.
|
|
assert.True(t, app.Status.LastRunName == "")
|
|
|
|
// Advance the clock by 1 minute.
|
|
clk.Step(1 * time.Minute)
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
firstRunName := app.Status.LastRunName
|
|
// The first run should have been triggered.
|
|
assert.True(t, firstRunName != "")
|
|
assert.False(t, app.Status.LastRun.IsZero())
|
|
assert.True(t, app.Status.NextRun.Time.After(app.Status.LastRun.Time))
|
|
// The first run exists.
|
|
run, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
clk.SetTime(app.Status.NextRun.Time.Add(5 * time.Second))
|
|
// This sync should not start the next run because the first run has not completed yet.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, firstRunName, app.Status.LastRunName)
|
|
|
|
// Simulate completion of the first run.
|
|
run.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run)
|
|
// This sync should start the next run because the first run has completed.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
secondRunName := app.Status.LastRunName
|
|
assert.NotEqual(t, firstRunName, secondRunName)
|
|
assert.Equal(t, 1, len(app.Status.PastSuccessfulRunNames))
|
|
assert.Equal(t, firstRunName, app.Status.PastSuccessfulRunNames[0])
|
|
// The second run exists.
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(secondRunName, options)
|
|
assert.NotNil(t, run)
|
|
}
|
|
|
|
func TestSyncScheduledSparkApplication_Replace(t *testing.T) {
|
|
// TODO: figure out why the test fails and remove this.
|
|
t.Skip()
|
|
|
|
app := &v1beta2.ScheduledSparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "test-app-replace",
|
|
},
|
|
Spec: v1beta2.ScheduledSparkApplicationSpec{
|
|
Schedule: "@every 1m",
|
|
ConcurrencyPolicy: v1beta2.ConcurrencyReplace,
|
|
},
|
|
}
|
|
c, clk := newFakeController()
|
|
c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Create(app)
|
|
key, _ := cache.MetaNamespaceKeyFunc(app)
|
|
|
|
options := metav1.GetOptions{}
|
|
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
// The first run should not have been triggered.
|
|
assert.True(t, app.Status.LastRunName == "")
|
|
|
|
// Advance the clock by 1 minute.
|
|
clk.Step(1 * time.Minute)
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
assert.Equal(t, v1beta2.ScheduledState, app.Status.ScheduleState)
|
|
firstRunName := app.Status.LastRunName
|
|
// The first run should have been triggered.
|
|
assert.True(t, firstRunName != "")
|
|
assert.False(t, app.Status.LastRun.IsZero())
|
|
assert.True(t, app.Status.NextRun.Time.After(app.Status.LastRun.Time))
|
|
// The first run exists.
|
|
run, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.NotNil(t, run)
|
|
|
|
clk.SetTime(app.Status.NextRun.Time.Add(5 * time.Second))
|
|
// This sync should replace the first run with a new run.
|
|
if err := c.syncScheduledSparkApplication(key); err != nil {
|
|
t.Fatal(err)
|
|
}
|
|
app, _ = c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Get(app.Name, options)
|
|
secondRunName := app.Status.LastRunName
|
|
assert.NotEqual(t, firstRunName, secondRunName)
|
|
// The first run should have been deleted.
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(firstRunName, options)
|
|
assert.Nil(t, run)
|
|
// The second run exists.
|
|
run, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(secondRunName, options)
|
|
assert.NotNil(t, run)
|
|
}
|
|
|
|
func TestShouldStartNextRun(t *testing.T) {
|
|
app := &v1beta2.ScheduledSparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "test-app",
|
|
},
|
|
Spec: v1beta2.ScheduledSparkApplicationSpec{
|
|
Schedule: "@every 1m",
|
|
},
|
|
Status: v1beta2.ScheduledSparkApplicationStatus{
|
|
LastRunName: "run1",
|
|
},
|
|
}
|
|
c, _ := newFakeController()
|
|
c.crdClient.SparkoperatorV1beta2().ScheduledSparkApplications(app.Namespace).Create(app)
|
|
|
|
run1 := &v1beta2.SparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: app.Namespace,
|
|
Name: "run1",
|
|
Labels: map[string]string{config.ScheduledSparkAppNameLabel: app.Name},
|
|
},
|
|
}
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(run1.Namespace).Create(run1)
|
|
|
|
// ConcurrencyAllow with a running run.
|
|
run1.Status.AppState.State = v1beta2.RunningState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(run1.Namespace).Update(run1)
|
|
app.Spec.ConcurrencyPolicy = v1beta2.ConcurrencyAllow
|
|
ok, _ := c.shouldStartNextRun(app)
|
|
assert.True(t, ok)
|
|
|
|
// ConcurrencyForbid with a running run.
|
|
app.Spec.ConcurrencyPolicy = v1beta2.ConcurrencyForbid
|
|
ok, _ = c.shouldStartNextRun(app)
|
|
assert.False(t, ok)
|
|
// ConcurrencyForbid with a completed run.
|
|
run1.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(run1.Namespace).Update(run1)
|
|
ok, _ = c.shouldStartNextRun(app)
|
|
assert.True(t, ok)
|
|
|
|
// ConcurrencyReplace with a completed run.
|
|
app.Spec.ConcurrencyPolicy = v1beta2.ConcurrencyReplace
|
|
ok, _ = c.shouldStartNextRun(app)
|
|
assert.True(t, ok)
|
|
// ConcurrencyReplace with a running run.
|
|
run1.Status.AppState.State = v1beta2.RunningState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(run1.Namespace).Update(run1)
|
|
ok, _ = c.shouldStartNextRun(app)
|
|
assert.True(t, ok)
|
|
// The previous running run should have been deleted.
|
|
existing, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(run1.Namespace).Get(run1.Name,
|
|
metav1.GetOptions{})
|
|
assert.Nil(t, existing)
|
|
}
|
|
|
|
func TestCheckAndUpdatePastRuns(t *testing.T) {
|
|
var two int32 = 2
|
|
app := &v1beta2.ScheduledSparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: "default",
|
|
Name: "test-app",
|
|
},
|
|
Spec: v1beta2.ScheduledSparkApplicationSpec{
|
|
Schedule: "@every 1m",
|
|
SuccessfulRunHistoryLimit: &two,
|
|
FailedRunHistoryLimit: &two,
|
|
},
|
|
}
|
|
c, _ := newFakeController()
|
|
|
|
run1 := &v1beta2.SparkApplication{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Namespace: app.Namespace,
|
|
Name: "run1",
|
|
Labels: map[string]string{config.ScheduledSparkAppNameLabel: app.Name},
|
|
},
|
|
Status: v1beta2.SparkApplicationStatus{
|
|
AppState: v1beta2.ApplicationState{
|
|
State: v1beta2.CompletedState,
|
|
},
|
|
},
|
|
}
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run1)
|
|
|
|
// The first completed run should have been recorded.
|
|
status := app.Status.DeepCopy()
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 1, len(status.PastSuccessfulRunNames))
|
|
assert.Equal(t, run1.Name, status.PastSuccessfulRunNames[0])
|
|
|
|
// The second run that is running should not be recorded.
|
|
run2 := run1.DeepCopy()
|
|
run2.CreationTimestamp.Time = run1.CreationTimestamp.Add(10 * time.Second)
|
|
run2.Name = "run2"
|
|
run2.Status.AppState.State = v1beta2.RunningState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run2)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 1, len(status.PastSuccessfulRunNames))
|
|
assert.Equal(t, run1.Name, status.PastSuccessfulRunNames[0])
|
|
// The second completed run should have been recorded.
|
|
run2.Status.AppState.State = v1beta2.CompletedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Update(run2)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 2, len(status.PastSuccessfulRunNames))
|
|
assert.Equal(t, run2.Name, status.PastSuccessfulRunNames[0])
|
|
assert.Equal(t, run1.Name, status.PastSuccessfulRunNames[1])
|
|
// The second completed run has already been recorded, so should not be recorded again.
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 2, len(status.PastSuccessfulRunNames))
|
|
assert.Equal(t, run2.Name, status.PastSuccessfulRunNames[0])
|
|
assert.Equal(t, run1.Name, status.PastSuccessfulRunNames[1])
|
|
// SparkApplications of both of the first two completed runs should exist.
|
|
existing, _ := c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run2.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run1.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
|
|
// The third completed run should have been recorded.
|
|
run3 := run1.DeepCopy()
|
|
run3.CreationTimestamp.Time = run2.CreationTimestamp.Add(10 * time.Second)
|
|
run3.Name = "run3"
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run3)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 2, len(status.PastSuccessfulRunNames))
|
|
assert.Equal(t, run3.Name, status.PastSuccessfulRunNames[0])
|
|
assert.Equal(t, run2.Name, status.PastSuccessfulRunNames[1])
|
|
// SparkApplications of the last two completed runs should still exist,
|
|
// but the one of the first completed run should have been deleted.
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run3.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run2.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run1.Name,
|
|
metav1.GetOptions{})
|
|
assert.Nil(t, existing)
|
|
|
|
// The first failed run should have been recorded.
|
|
run4 := run1.DeepCopy()
|
|
run4.CreationTimestamp.Time = run3.CreationTimestamp.Add(10 * time.Second)
|
|
run4.Name = "run4"
|
|
run4.Status.AppState.State = v1beta2.FailedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run4)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 1, len(status.PastFailedRunNames))
|
|
assert.Equal(t, run4.Name, status.PastFailedRunNames[0])
|
|
|
|
// The second failed run should have been recorded.
|
|
run5 := run1.DeepCopy()
|
|
run5.CreationTimestamp.Time = run4.CreationTimestamp.Add(10 * time.Second)
|
|
run5.Name = "run5"
|
|
run5.Status.AppState.State = v1beta2.FailedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run5)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 2, len(status.PastFailedRunNames))
|
|
assert.Equal(t, run5.Name, status.PastFailedRunNames[0])
|
|
assert.Equal(t, run4.Name, status.PastFailedRunNames[1])
|
|
|
|
// The third failed run should have been recorded.
|
|
run6 := run1.DeepCopy()
|
|
run6.CreationTimestamp.Time = run5.CreationTimestamp.Add(10 * time.Second)
|
|
run6.Name = "run6"
|
|
run6.Status.AppState.State = v1beta2.FailedState
|
|
c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Create(run6)
|
|
c.checkAndUpdatePastRuns(app, status)
|
|
assert.Equal(t, 2, len(status.PastFailedRunNames))
|
|
assert.Equal(t, run6.Name, status.PastFailedRunNames[0])
|
|
assert.Equal(t, run5.Name, status.PastFailedRunNames[1])
|
|
// SparkApplications of the last two failed runs should still exist,
|
|
// but the one of the first failed run should have been deleted.
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run6.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run5.Name,
|
|
metav1.GetOptions{})
|
|
assert.NotNil(t, existing)
|
|
existing, _ = c.crdClient.SparkoperatorV1beta2().SparkApplications(app.Namespace).Get(run4.Name,
|
|
metav1.GetOptions{})
|
|
assert.Nil(t, existing)
|
|
}
|
|
|
|
func newFakeController() (*Controller, *clock.FakeClock) {
|
|
crdClient := crdclientfake.NewSimpleClientset()
|
|
kubeClient := kubeclientfake.NewSimpleClientset()
|
|
apiExtensionsClient := apiextensionsfake.NewSimpleClientset()
|
|
informerFactory := crdinformers.NewSharedInformerFactory(crdClient, 1*time.Second)
|
|
clk := clock.NewFakeClock(time.Now())
|
|
controller := NewController(crdClient, kubeClient, apiExtensionsClient, informerFactory, clk)
|
|
ssaInformer := informerFactory.Sparkoperator().V1beta2().ScheduledSparkApplications().Informer()
|
|
saInformer := informerFactory.Sparkoperator().V1beta2().SparkApplications().Informer()
|
|
crdClient.PrependReactor("create", "scheduledsparkapplications",
|
|
func(action kubetesting.Action) (bool, runtime.Object, error) {
|
|
obj := action.(kubetesting.CreateAction).GetObject()
|
|
ssaInformer.GetStore().Add(obj)
|
|
return false, obj, nil
|
|
})
|
|
crdClient.PrependReactor("update", "scheduledsparkapplications",
|
|
func(action kubetesting.Action) (bool, runtime.Object, error) {
|
|
obj := action.(kubetesting.UpdateAction).GetObject()
|
|
ssaInformer.GetStore().Update(obj)
|
|
return false, obj, nil
|
|
})
|
|
crdClient.PrependReactor("create", "sparkapplications",
|
|
func(action kubetesting.Action) (bool, runtime.Object, error) {
|
|
obj := action.(kubetesting.CreateAction).GetObject()
|
|
saInformer.GetStore().Add(obj)
|
|
return false, obj, nil
|
|
})
|
|
crdClient.PrependReactor("update", "sparkapplications",
|
|
func(action kubetesting.Action) (bool, runtime.Object, error) {
|
|
obj := action.(kubetesting.UpdateAction).GetObject()
|
|
saInformer.GetStore().Update(obj)
|
|
return false, obj, nil
|
|
})
|
|
return controller, clk
|
|
}
|