diff --git a/api/v1beta2/sparkapplication_types.go b/api/v1beta2/sparkapplication_types.go index 674105b3..a332da54 100644 --- a/api/v1beta2/sparkapplication_types.go +++ b/api/v1beta2/sparkapplication_types.go @@ -703,6 +703,12 @@ type DynamicAllocation struct { // MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled. // +optional MaxExecutors *int32 `json:"maxExecutors,omitempty"` + // ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without + // the need for an external shuffle service. This option will try to keep alive executors that are storing + // shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. + // ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true. + // +optional + ShuffleTrackingEnabled *bool `json:"shuffleTrackingEnabled,omitempty"` // ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding // shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled). // +optional diff --git a/api/v1beta2/zz_generated.deepcopy.go b/api/v1beta2/zz_generated.deepcopy.go index 1b080e1f..0689a70d 100644 --- a/api/v1beta2/zz_generated.deepcopy.go +++ b/api/v1beta2/zz_generated.deepcopy.go @@ -279,6 +279,11 @@ func (in *DynamicAllocation) DeepCopyInto(out *DynamicAllocation) { *out = new(int32) **out = **in } + if in.ShuffleTrackingEnabled != nil { + in, out := &in.ShuffleTrackingEnabled, &out.ShuffleTrackingEnabled + *out = new(bool) + **out = **in + } if in.ShuffleTrackingTimeout != nil { in, out := &in.ShuffleTrackingTimeout, &out.ShuffleTrackingTimeout *out = new(int64) diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 99a2fa09..0a2d9c7f 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -5296,6 +5296,13 @@ spec: of executors if dynamic allocation is enabled. format: int32 type: integer + shuffleTrackingEnabled: + description: |- + ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without + the need for an external shuffle service. This option will try to keep alive executors that are storing + shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. + ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true. + type: boolean shuffleTrackingTimeout: description: |- ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding diff --git a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml index 62699343..c3d4c59e 100644 --- a/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml +++ b/charts/spark-operator-chart/crds/sparkoperator.k8s.io_sparkapplications.yaml @@ -5238,6 +5238,13 @@ spec: executors if dynamic allocation is enabled. format: int32 type: integer + shuffleTrackingEnabled: + description: |- + ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without + the need for an external shuffle service. This option will try to keep alive executors that are storing + shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. + ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true. + type: boolean shuffleTrackingTimeout: description: |- ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding diff --git a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml index 99a2fa09..0a2d9c7f 100644 --- a/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_scheduledsparkapplications.yaml @@ -5296,6 +5296,13 @@ spec: of executors if dynamic allocation is enabled. format: int32 type: integer + shuffleTrackingEnabled: + description: |- + ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without + the need for an external shuffle service. This option will try to keep alive executors that are storing + shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. + ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true. + type: boolean shuffleTrackingTimeout: description: |- ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding diff --git a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml index 62699343..c3d4c59e 100644 --- a/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml +++ b/config/crd/bases/sparkoperator.k8s.io_sparkapplications.yaml @@ -5238,6 +5238,13 @@ spec: executors if dynamic allocation is enabled. format: int32 type: integer + shuffleTrackingEnabled: + description: |- + ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without + the need for an external shuffle service. This option will try to keep alive executors that are storing + shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. + ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true. + type: boolean shuffleTrackingTimeout: description: |- ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding diff --git a/docs/api-docs.md b/docs/api-docs.md index 4c4196c6..b81759f3 100644 --- a/docs/api-docs.md +++ b/docs/api-docs.md @@ -741,6 +741,21 @@ int32 +shuffleTrackingEnabled
+ +bool + + + +(Optional) +

ShuffleTrackingEnabled enables shuffle file tracking for executors, which allows dynamic allocation without +the need for an external shuffle service. This option will try to keep alive executors that are storing +shuffle data for active jobs. If external shuffle service is enabled, set ShuffleTrackingEnabled to false. +ShuffleTrackingEnabled is true by default if dynamicAllocation.enabled is true.

+ + + + shuffleTrackingTimeout
int64 diff --git a/internal/controller/sparkapplication/submission.go b/internal/controller/sparkapplication/submission.go index 6a2a56b0..361610a6 100644 --- a/internal/controller/sparkapplication/submission.go +++ b/internal/controller/sparkapplication/submission.go @@ -989,10 +989,6 @@ func dynamicAllocationOption(app *v1beta2.SparkApplication) ([]string, error) { args = append(args, "--conf", fmt.Sprintf("%s=true", common.SparkDynamicAllocationEnabled)) - // Turn on shuffle tracking if dynamic allocation is enabled. - args = append(args, "--conf", - fmt.Sprintf("%s=true", common.SparkDynamicAllocationShuffleTrackingEnabled)) - if dynamicAllocation.InitialExecutors != nil { args = append(args, "--conf", fmt.Sprintf("%s=%d", common.SparkDynamicAllocationInitialExecutors, *dynamicAllocation.InitialExecutors)) @@ -1005,6 +1001,12 @@ func dynamicAllocationOption(app *v1beta2.SparkApplication) ([]string, error) { args = append(args, "--conf", fmt.Sprintf("%s=%d", common.SparkDynamicAllocationMaxExecutors, *dynamicAllocation.MaxExecutors)) } + shuffleTrackingEnabled := true + if dynamicAllocation.ShuffleTrackingEnabled != nil { + shuffleTrackingEnabled = *dynamicAllocation.ShuffleTrackingEnabled + } + args = append(args, "--conf", + fmt.Sprintf("%s=%t", common.SparkDynamicAllocationShuffleTrackingEnabled, shuffleTrackingEnabled)) if dynamicAllocation.ShuffleTrackingTimeout != nil { args = append(args, "--conf", fmt.Sprintf("%s=%d", common.SparkDynamicAllocationShuffleTrackingTimeout, *dynamicAllocation.ShuffleTrackingTimeout)) diff --git a/internal/webhook/sparkapplication_defaulter.go b/internal/webhook/sparkapplication_defaulter.go index 9c10ea10..9035cdee 100644 --- a/internal/webhook/sparkapplication_defaulter.go +++ b/internal/webhook/sparkapplication_defaulter.go @@ -18,7 +18,6 @@ package webhook import ( "context" - "strconv" "k8s.io/apimachinery/pkg/runtime" "sigs.k8s.io/controller-runtime/pkg/webhook/admission" @@ -86,17 +85,21 @@ func defaultDriverSpec(app *v1beta2.SparkApplication) { } func defaultExecutorSpec(app *v1beta2.SparkApplication) { - if app.Spec.Executor.Instances == nil { - // Check whether dynamic allocation is enabled in application spec. - enableDynamicAllocation := app.Spec.DynamicAllocation != nil && app.Spec.DynamicAllocation.Enabled - // Check whether dynamic allocation is enabled in spark conf. - if !enableDynamicAllocation && app.Spec.SparkConf != nil { - if dynamicConf, _ := strconv.ParseBool(app.Spec.SparkConf[common.SparkDynamicAllocationEnabled]); dynamicConf { - enableDynamicAllocation = true - } - if !enableDynamicAllocation && app.Spec.SparkConf[common.SparkExecutorInstances] == "" { - app.Spec.Executor.Instances = util.Int32Ptr(1) - } - } + + isDynamicAllocationEnabled := util.IsDynamicAllocationEnabled(app) + + if app.Spec.Executor.Instances == nil && + app.Spec.SparkConf[common.SparkExecutorInstances] == "" && + !isDynamicAllocationEnabled { + app.Spec.Executor.Instances = util.Int32Ptr(1) } + + // Set default for ShuffleTrackingEnabled to true if DynamicAllocation.enabled is true and + // DynamicAllocation.ShuffleTrackingEnabled is nil. + if isDynamicAllocationEnabled && + app.Spec.DynamicAllocation != nil && + app.Spec.DynamicAllocation.ShuffleTrackingEnabled == nil { + app.Spec.DynamicAllocation.ShuffleTrackingEnabled = util.BoolPtr(true) + } + } diff --git a/pkg/util/sparkapplication.go b/pkg/util/sparkapplication.go index 65bdb269..35feb129 100644 --- a/pkg/util/sparkapplication.go +++ b/pkg/util/sparkapplication.go @@ -20,6 +20,7 @@ import ( "crypto/md5" "fmt" "reflect" + "strconv" "strings" "time" @@ -495,3 +496,13 @@ func GetInitialExecutorNumber(app *v1beta2.SparkApplication) int32 { return initialNumExecutors } + +// IsDynamicAllocationEnabled determines if Spark Dynamic Allocation is enabled in app.Spec.DynamicAllocation or in +// app.Spec.SparkConf. app.Spec.DynamicAllocation configs will take precedence over app.Spec.SparkConf configs. +func IsDynamicAllocationEnabled(app *v1beta2.SparkApplication) bool { + if app.Spec.DynamicAllocation != nil { + return app.Spec.DynamicAllocation.Enabled + } + dynamicAllocationConfVal, _ := strconv.ParseBool(app.Spec.SparkConf[common.SparkDynamicAllocationEnabled]) + return dynamicAllocationConfVal +} diff --git a/pkg/util/sparkapplication_test.go b/pkg/util/sparkapplication_test.go index 3aaa57e8..ea99e97e 100644 --- a/pkg/util/sparkapplication_test.go +++ b/pkg/util/sparkapplication_test.go @@ -356,3 +356,69 @@ var _ = Describe("DriverStateToApplicationState", func() { Expect(util.DriverStateToApplicationState(v1beta2.DriverStateUnknown)).To(Equal(v1beta2.ApplicationStateUnknown)) }) }) + +var _ = Describe("Check if IsDynamicAllocationEnabled", func() { + Context("when app.Spec.DynamicAllocation is True", func() { + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "test-namespace", + }, + Spec: v1beta2.SparkApplicationSpec{ + DynamicAllocation: &v1beta2.DynamicAllocation{ + Enabled: true, + }, + }, + } + It("Should return true", func() { + Expect(util.IsDynamicAllocationEnabled(app)).To(BeTrue()) + }) + }) + Context("when app.Spec.DynamicAllocation is nil but True in app.Spec.SparkConf", func() { + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "test-namespace", + }, + Spec: v1beta2.SparkApplicationSpec{ + SparkConf: map[string]string{ + "spark.dynamicAllocation.enabled": "true", + }, + }, + } + It("Should return true", func() { + Expect(util.IsDynamicAllocationEnabled(app)).To(BeTrue()) + }) + }) + Context("when app.Spec.DynamicAllocation is nil and not set in app.Spec.SparkConf", func() { + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "test-namespace", + }, + } + It("Should return false", func() { + Expect(util.IsDynamicAllocationEnabled(app)).To(BeFalse()) + }) + }) + Context("when app.Spec.DynamicAllocation is True but false in app.Spec.SparkConf", func() { + app := &v1beta2.SparkApplication{ + ObjectMeta: metav1.ObjectMeta{ + Name: "test-app", + Namespace: "test-namespace", + }, + Spec: v1beta2.SparkApplicationSpec{ + DynamicAllocation: &v1beta2.DynamicAllocation{ + Enabled: true, + }, + SparkConf: map[string]string{ + "spark.dynamicAllocation.enabled": "false", + }, + }, + } + It("Should return true because app.Spec.DynamicAllocation configs will take precedence over "+ + "app.Spec.SparkConf configs", func() { + Expect(util.IsDynamicAllocationEnabled(app)).To(BeTrue()) + }) + }) +})