diff --git a/docs/api-docs.md b/docs/api-docs.md
index 0cd97d0c..a0ff575a 100644
--- a/docs/api-docs.md
+++ b/docs/api-docs.md
@@ -586,6 +586,21 @@ SparkUIConfiguration
SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI
+
+
+dynamicAllocation
+
+
+DynamicAllocation
+
+
+ |
+
+(Optional)
+ DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
+scheduleer backend since Spark 3.0.
+ |
+
@@ -969,6 +984,86 @@ executors to connect to the driver.
DriverState tells the current state of a spark driver.
+DynamicAllocation
+
+
+(Appears on:
+SparkApplicationSpec)
+
+
+
DynamicAllocation contains configuration options for dynamic allocation.
+
+
+
+
+Field |
+Description |
+
+
+
+
+
+enabled
+
+bool
+
+ |
+
+ Enabled controls whether dynamic allocation is enabled or not.
+ |
+
+
+
+initialExecutors
+
+int32
+
+ |
+
+(Optional)
+ InitialExecutors is the initial number of executors to request. If .spec.executor.instances
+is also set, the initial number of executors is set to the bigger of that and this option.
+ |
+
+
+
+minExecutors
+
+int32
+
+ |
+
+(Optional)
+ MinExecutors is the lower bound for the number of executors if dynamic allocation is enabled.
+ |
+
+
+
+maxExecutors
+
+int32
+
+ |
+
+(Optional)
+ MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled.
+ |
+
+
+
+shuffleTrackingTimeout
+
+int64
+
+ |
+
+(Optional)
+ ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding
+shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled).
+ |
+
+
+
ExecutorSpec
@@ -2072,6 +2167,21 @@ SparkUIConfiguration
SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI
+
+
+dynamicAllocation
+
+
+DynamicAllocation
+
+
+ |
+
+(Optional)
+ DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
+scheduleer backend since Spark 3.0.
+ |
+
SparkApplicationStatus
@@ -2582,7 +2692,7 @@ string
SparkApplicationSpec)
-
Specific SparkUI config parameters
+SparkUIConfiguration is for driver UI specific configuration parameters.
@@ -2636,5 +2746,5 @@ map[string]string
Generated with gen-crd-api-reference-docs
-on git commit f313873
.
+on git commit 555c27a
.
diff --git a/docs/user-guide.md b/docs/user-guide.md
index ed39a523..5c8fab99 100644
--- a/docs/user-guide.md
+++ b/docs/user-guide.md
@@ -34,6 +34,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Using Container LifeCycle Hooks](#using-container-lifecycle-hooks)
* [Python Support](#python-support)
* [Monitoring](#monitoring)
+ * [Dynamic Allocation](#dynamic-allocation)
* [Working with SparkApplications](#working-with-sparkapplications)
* [Creating a New SparkApplication](#creating-a-new-sparkapplication)
* [Deleting a SparkApplication](#deleting-a-sparkapplication)
@@ -638,6 +639,21 @@ spec:
The operator automatically adds the annotations such as `prometheus.io/scrape=true` on the driver and/or executor pods (depending on the values of `.spec.monitoring.exposeDriverMetrics` and `.spec.monitoring.exposeExecutorMetrics`) so the metrics exposed on the pods can be scraped by the Prometheus server in the same cluster.
+### Dynamic Allocation
+
+The operator supports a limited form of [Spark Dynamic Resource Allocation](http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation) through the shuffle tracking enhancement introduced in Spark 3.0.0 *without needing an external shuffle service* (not available in the Kubernetes mode). See this [issue](https://issues.apache.org/jira/browse/SPARK-27963) for detais on the enhancement. To enable this limited form of dynamic allocation, follow the example below:
+
+```yaml
+spec:
+ dynamicAllocation:
+ enabled: true
+ initialExecutors: 2
+ minExecutors: 2
+ maxExecutors: 10
+```
+
+Note that if dynamic allocation is enabled, the number of executors to request initially is set to the bigger of `.spec.dynamicAllocation.initialExecutors` and `.spec.executor.instances` if both are set.
+
## Working with SparkApplications
### Creating a New SparkApplication
diff --git a/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml b/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
index 2a9ce431..de4aef14 100644
--- a/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
+++ b/manifest/crds/sparkoperator.k8s.io_scheduledsparkapplications.yaml
@@ -1800,6 +1800,23 @@ spec:
type: object
type: array
type: object
+ dynamicAllocation:
+ properties:
+ enabled:
+ type: boolean
+ initialExecutors:
+ format: int32
+ type: integer
+ maxExecutors:
+ format: int32
+ type: integer
+ minExecutors:
+ format: int32
+ type: integer
+ shuffleTrackingTimeout:
+ format: int64
+ type: integer
+ type: object
executor:
properties:
affinity:
diff --git a/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml b/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
index 9feda0e9..e5c8f2d8 100644
--- a/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
+++ b/manifest/crds/sparkoperator.k8s.io_sparkapplications.yaml
@@ -1786,6 +1786,23 @@ spec:
type: object
type: array
type: object
+ dynamicAllocation:
+ properties:
+ enabled:
+ type: boolean
+ initialExecutors:
+ format: int32
+ type: integer
+ maxExecutors:
+ format: int32
+ type: integer
+ minExecutors:
+ format: int32
+ type: integer
+ shuffleTrackingTimeout:
+ format: int64
+ type: integer
+ type: object
executor:
properties:
affinity:
diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
index b84db8bc..2150034f 100644
--- a/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
+++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/types.go
@@ -276,6 +276,10 @@ type SparkApplicationSpec struct {
// SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI
// +optional
SparkUIOptions *SparkUIConfiguration `json:"sparkUIOptions,omitempty"`
+ // DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
+ // scheduleer backend since Spark 3.0.
+ // +optional
+ DynamicAllocation *DynamicAllocation `json:"dynamicAllocation,omitempty"`
}
// BatchSchedulerConfiguration used to configure how to batch scheduling Spark Application
@@ -288,7 +292,7 @@ type BatchSchedulerConfiguration struct {
PriorityClassName *string `json:"priorityClassName,omitempty"`
}
-// Specific SparkUI config parameters
+// SparkUIConfiguration is for driver UI specific configuration parameters.
type SparkUIConfiguration struct {
// ServicePort allows configuring the port at service level that might be different from the targetPort.
// TargetPort should be the same as the one defined in spark.ui.port
@@ -631,6 +635,26 @@ type GPUSpec struct {
Quantity int64 `json:"quantity"`
}
+// DynamicAllocation contains configuration options for dynamic allocation.
+type DynamicAllocation struct {
+ // Enabled controls whether dynamic allocation is enabled or not.
+ Enabled bool `json:"enabled,omitempty"`
+ // InitialExecutors is the initial number of executors to request. If .spec.executor.instances
+ // is also set, the initial number of executors is set to the bigger of that and this option.
+ // +optional
+ InitialExecutors *int32 `json:"initialExecutors,omitempty"`
+ // MinExecutors is the lower bound for the number of executors if dynamic allocation is enabled.
+ // +optional
+ MinExecutors *int32 `json:"minExecutors,omitempty"`
+ // MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled.
+ // +optional
+ MaxExecutors *int32 `json:"maxExecutors,omitempty"`
+ // ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding
+ // shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled).
+ // +optional
+ ShuffleTrackingTimeout *int64 `json:"shuffleTrackingTimeout,omitempty"`
+}
+
// PrometheusMonitoringEnabled returns if Prometheus monitoring is enabled or not.
func (s *SparkApplication) PrometheusMonitoringEnabled() bool {
return s.Spec.Monitoring != nil && s.Spec.Monitoring.Prometheus != nil
diff --git a/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go b/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
index ebd732c3..3b639475 100644
--- a/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
+++ b/pkg/apis/sparkoperator.k8s.io/v1beta2/zz_generated.deepcopy.go
@@ -166,6 +166,42 @@ func (in *DriverSpec) DeepCopy() *DriverSpec {
return out
}
+// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
+func (in *DynamicAllocation) DeepCopyInto(out *DynamicAllocation) {
+ *out = *in
+ if in.InitialExecutors != nil {
+ in, out := &in.InitialExecutors, &out.InitialExecutors
+ *out = new(int32)
+ **out = **in
+ }
+ if in.MinExecutors != nil {
+ in, out := &in.MinExecutors, &out.MinExecutors
+ *out = new(int32)
+ **out = **in
+ }
+ if in.MaxExecutors != nil {
+ in, out := &in.MaxExecutors, &out.MaxExecutors
+ *out = new(int32)
+ **out = **in
+ }
+ if in.ShuffleTrackingTimeout != nil {
+ in, out := &in.ShuffleTrackingTimeout, &out.ShuffleTrackingTimeout
+ *out = new(int64)
+ **out = **in
+ }
+ return
+}
+
+// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamicAllocation.
+func (in *DynamicAllocation) DeepCopy() *DynamicAllocation {
+ if in == nil {
+ return nil
+ }
+ out := new(DynamicAllocation)
+ in.DeepCopyInto(out)
+ return out
+}
+
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) {
*out = *in
@@ -667,6 +703,11 @@ func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) {
*out = new(SparkUIConfiguration)
(*in).DeepCopyInto(*out)
}
+ if in.DynamicAllocation != nil {
+ in, out := &in.DynamicAllocation, &out.DynamicAllocation
+ *out = new(DynamicAllocation)
+ (*in).DeepCopyInto(*out)
+ }
return
}
diff --git a/pkg/config/constants.go b/pkg/config/constants.go
index 07ba29bf..85deffe5 100644
--- a/pkg/config/constants.go
+++ b/pkg/config/constants.go
@@ -148,6 +148,24 @@ const (
SparkDriverKubernetesMaster = "spark.kubernetes.driver.master"
// SparkDriverServiceAnnotationKeyPrefix is the key prefix of annotations to be added to the driver service.
SparkDriverServiceAnnotationKeyPrefix = "spark.kubernetes.driver.service.annotation."
+ // SparkDynamicAllocationEnabled is the Spark configuration key for specifying if dynamic
+ // allocation is enabled or not.
+ SparkDynamicAllocationEnabled = "spark.dynamicAllocation.enabled"
+ // SparkDynamicAllocationShuffleTrackingEnabled is the Spark configuration key for
+ // specifying if shuffle data tracking is enabled.
+ SparkDynamicAllocationShuffleTrackingEnabled = "spark.dynamicAllocation.shuffleTracking.enabled"
+ // SparkDynamicAllocationShuffleTrackingTimeout is the Spark configuration key for specifying
+ // the shuffle tracking timeout in milliseconds if shuffle tracking is enabled.
+ SparkDynamicAllocationShuffleTrackingTimeout = "spark.dynamicAllocation.shuffleTracking.timeout"
+ // SparkDynamicAllocationInitialExecutors is the Spark configuration key for specifying
+ // the initial number of executors to request if dynamic allocation is enabled.
+ SparkDynamicAllocationInitialExecutors = "spark.dynamicAllocation.initialExecutors"
+ // SparkDynamicAllocationMinExecutors is the Spark configuration key for specifying the
+ // lower bound of the number of executors to request if dynamic allocation is enabled.
+ SparkDynamicAllocationMinExecutors = "spark.dynamicAllocation.minExecutors"
+ // SparkDynamicAllocationMaxExecutors is the Spark configuration key for specifying the
+ // upper bound of the number of executors to request if dynamic allocation is enabled.
+ SparkDynamicAllocationMaxExecutors = "spark.dynamicAllocation.maxExecutors"
)
const (
diff --git a/pkg/controller/sparkapplication/submission.go b/pkg/controller/sparkapplication/submission.go
index 8955e86c..cebf3b00 100644
--- a/pkg/controller/sparkapplication/submission.go
+++ b/pkg/controller/sparkapplication/submission.go
@@ -140,11 +140,6 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
args = append(args, "--conf", fmt.Sprintf("spark.hadoop.%s=%s", key, value))
}
- for key, value := range app.Spec.NodeSelector {
- conf := fmt.Sprintf("%s%s=%s", config.SparkNodeSelectorKeyPrefix, key, value)
- args = append(args, "--conf", conf)
- }
-
// Add the driver and executor configuration options.
// Note that when the controller submits the application, it expects that all dependencies are local
// so init-container is not needed and therefore no init-container image needs to be specified.
@@ -163,6 +158,16 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
args = append(args, "--conf", option)
}
+ options = addDynamicAllocationConfOptions(app)
+ for _, option := range options {
+ args = append(args, "--conf", option)
+ }
+
+ for key, value := range app.Spec.NodeSelector {
+ conf := fmt.Sprintf("%s%s=%s", config.SparkNodeSelectorKeyPrefix, key, value)
+ args = append(args, "--conf", conf)
+ }
+
if app.Spec.Volumes != nil {
options, err = addLocalDirConfOptions(app)
if err != nil {
@@ -393,6 +398,36 @@ func addExecutorConfOptions(app *v1beta2.SparkApplication, submissionID string)
return executorConfOptions, nil
}
+func addDynamicAllocationConfOptions(app *v1beta2.SparkApplication) []string {
+ if app.Spec.DynamicAllocation == nil {
+ return nil
+ }
+
+ dynamicAllocation := app.Spec.DynamicAllocation
+ if !dynamicAllocation.Enabled {
+ return nil
+ }
+
+ var options []string
+ options = append(options, fmt.Sprintf("%s=true", config.SparkDynamicAllocationEnabled))
+ // Turn on shuffle tracking if dynamic allocation is enabled.
+ options = append(options, fmt.Sprintf("%s=true", config.SparkDynamicAllocationShuffleTrackingEnabled))
+ if dynamicAllocation.InitialExecutors != nil {
+ options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationInitialExecutors, *dynamicAllocation.InitialExecutors))
+ }
+ if dynamicAllocation.MinExecutors != nil {
+ options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationMinExecutors, *dynamicAllocation.MinExecutors))
+ }
+ if dynamicAllocation.MaxExecutors != nil {
+ options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationMaxExecutors, *dynamicAllocation.MaxExecutors))
+ }
+ if dynamicAllocation.ShuffleTrackingTimeout != nil {
+ options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationShuffleTrackingTimeout, *dynamicAllocation.ShuffleTrackingTimeout))
+ }
+
+ return options
+}
+
// addLocalDirConfOptions excludes local dir volumes, update SparkApplication and returns local dir config options
func addLocalDirConfOptions(app *v1beta2.SparkApplication) ([]string, error) {
var localDirConfOptions []string
diff --git a/pkg/controller/sparkapplication/submission_test.go b/pkg/controller/sparkapplication/submission_test.go
index 5a26d619..e00015b4 100644
--- a/pkg/controller/sparkapplication/submission_test.go
+++ b/pkg/controller/sparkapplication/submission_test.go
@@ -18,18 +18,20 @@ package sparkapplication
import (
"fmt"
- "github.com/google/uuid"
"reflect"
"sort"
"strconv"
"testing"
+ "github.com/google/uuid"
+
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
+ "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
)
const (
@@ -504,3 +506,40 @@ func TestPopulateLabelsOverride_Driver_Executor(t *testing.T) {
t.Errorf("Executor labels: wanted %+q got %+q", expectedExecutorLabels, executorOptions)
}
}
+
+func TestDynamicAllocationOptions(t *testing.T) {
+ app := &v1beta2.SparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "spark-test",
+ UID: "spark-test-1",
+ },
+ Spec: v1beta2.SparkApplicationSpec{},
+ }
+ options := addDynamicAllocationConfOptions(app)
+ assert.Equal(t, 0, len(options))
+
+ app = &v1beta2.SparkApplication{
+ ObjectMeta: metav1.ObjectMeta{
+ Name: "spark-test",
+ UID: "spark-test-1",
+ },
+ Spec: v1beta2.SparkApplicationSpec{
+ DynamicAllocation: &v1beta2.DynamicAllocation{
+ Enabled: true,
+ InitialExecutors: int32ptr(2),
+ MinExecutors: int32ptr(0),
+ MaxExecutors: int32ptr(10),
+ ShuffleTrackingTimeout: int64ptr(6000000),
+ },
+ },
+ }
+
+ options = addDynamicAllocationConfOptions(app)
+ assert.Equal(t, 6, len(options))
+ assert.Equal(t, fmt.Sprintf("%s=true", config.SparkDynamicAllocationEnabled), options[0])
+ assert.Equal(t, fmt.Sprintf("%s=true", config.SparkDynamicAllocationShuffleTrackingEnabled), options[1])
+ assert.Equal(t, fmt.Sprintf("%s=2", config.SparkDynamicAllocationInitialExecutors), options[2])
+ assert.Equal(t, fmt.Sprintf("%s=0", config.SparkDynamicAllocationMinExecutors), options[3])
+ assert.Equal(t, fmt.Sprintf("%s=10", config.SparkDynamicAllocationMaxExecutors), options[4])
+ assert.Equal(t, fmt.Sprintf("%s=6000000", config.SparkDynamicAllocationShuffleTrackingTimeout), options[5])
+}