Add support for dynamic allocation via shuffle tracking (#976)

This commit is contained in:
Yinan Li 2020-07-16 11:16:47 -07:00 committed by GitHub
parent 555c27a487
commit 3ca74728a3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 326 additions and 9 deletions

View File

@ -586,6 +586,21 @@ SparkUIConfiguration
<p>SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI</p>
</td>
</tr>
<tr>
<td>
<code>dynamicAllocation</code></br>
<em>
<a href="#sparkoperator.k8s.io/v1beta2.DynamicAllocation">
DynamicAllocation
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
scheduleer backend since Spark 3.0.</p>
</td>
</tr>
</table>
</td>
</tr>
@ -969,6 +984,86 @@ executors to connect to the driver.</p>
<p>
<p>DriverState tells the current state of a spark driver.</p>
</p>
<h3 id="sparkoperator.k8s.io/v1beta2.DynamicAllocation">DynamicAllocation
</h3>
<p>
(<em>Appears on:</em>
<a href="#sparkoperator.k8s.io/v1beta2.SparkApplicationSpec">SparkApplicationSpec</a>)
</p>
<p>
<p>DynamicAllocation contains configuration options for dynamic allocation.</p>
</p>
<table>
<thead>
<tr>
<th>Field</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>enabled</code></br>
<em>
bool
</em>
</td>
<td>
<p>Enabled controls whether dynamic allocation is enabled or not.</p>
</td>
</tr>
<tr>
<td>
<code>initialExecutors</code></br>
<em>
int32
</em>
</td>
<td>
<em>(Optional)</em>
<p>InitialExecutors is the initial number of executors to request. If .spec.executor.instances
is also set, the initial number of executors is set to the bigger of that and this option.</p>
</td>
</tr>
<tr>
<td>
<code>minExecutors</code></br>
<em>
int32
</em>
</td>
<td>
<em>(Optional)</em>
<p>MinExecutors is the lower bound for the number of executors if dynamic allocation is enabled.</p>
</td>
</tr>
<tr>
<td>
<code>maxExecutors</code></br>
<em>
int32
</em>
</td>
<td>
<em>(Optional)</em>
<p>MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled.</p>
</td>
</tr>
<tr>
<td>
<code>shuffleTrackingTimeout</code></br>
<em>
int64
</em>
</td>
<td>
<em>(Optional)</em>
<p>ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding
shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled).</p>
</td>
</tr>
</tbody>
</table>
<h3 id="sparkoperator.k8s.io/v1beta2.ExecutorSpec">ExecutorSpec
</h3>
<p>
@ -2072,6 +2167,21 @@ SparkUIConfiguration
<p>SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI</p>
</td>
</tr>
<tr>
<td>
<code>dynamicAllocation</code></br>
<em>
<a href="#sparkoperator.k8s.io/v1beta2.DynamicAllocation">
DynamicAllocation
</a>
</em>
</td>
<td>
<em>(Optional)</em>
<p>DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
scheduleer backend since Spark 3.0.</p>
</td>
</tr>
</tbody>
</table>
<h3 id="sparkoperator.k8s.io/v1beta2.SparkApplicationStatus">SparkApplicationStatus
@ -2582,7 +2692,7 @@ string
<a href="#sparkoperator.k8s.io/v1beta2.SparkApplicationSpec">SparkApplicationSpec</a>)
</p>
<p>
<p>Specific SparkUI config parameters</p>
<p>SparkUIConfiguration is for driver UI specific configuration parameters.</p>
</p>
<table>
<thead>
@ -2636,5 +2746,5 @@ map[string]string
<hr/>
<p><em>
Generated with <code>gen-crd-api-reference-docs</code>
on git commit <code>f313873</code>.
on git commit <code>555c27a</code>.
</em></p>

View File

@ -34,6 +34,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Using Container LifeCycle Hooks](#using-container-lifecycle-hooks)
* [Python Support](#python-support)
* [Monitoring](#monitoring)
* [Dynamic Allocation](#dynamic-allocation)
* [Working with SparkApplications](#working-with-sparkapplications)
* [Creating a New SparkApplication](#creating-a-new-sparkapplication)
* [Deleting a SparkApplication](#deleting-a-sparkapplication)
@ -638,6 +639,21 @@ spec:
The operator automatically adds the annotations such as `prometheus.io/scrape=true` on the driver and/or executor pods (depending on the values of `.spec.monitoring.exposeDriverMetrics` and `.spec.monitoring.exposeExecutorMetrics`) so the metrics exposed on the pods can be scraped by the Prometheus server in the same cluster.
### Dynamic Allocation
The operator supports a limited form of [Spark Dynamic Resource Allocation](http://spark.apache.org/docs/latest/job-scheduling.html#dynamic-resource-allocation) through the shuffle tracking enhancement introduced in Spark 3.0.0 *without needing an external shuffle service* (not available in the Kubernetes mode). See this [issue](https://issues.apache.org/jira/browse/SPARK-27963) for detais on the enhancement. To enable this limited form of dynamic allocation, follow the example below:
```yaml
spec:
dynamicAllocation:
enabled: true
initialExecutors: 2
minExecutors: 2
maxExecutors: 10
```
Note that if dynamic allocation is enabled, the number of executors to request initially is set to the bigger of `.spec.dynamicAllocation.initialExecutors` and `.spec.executor.instances` if both are set.
## Working with SparkApplications
### Creating a New SparkApplication

View File

@ -1800,6 +1800,23 @@ spec:
type: object
type: array
type: object
dynamicAllocation:
properties:
enabled:
type: boolean
initialExecutors:
format: int32
type: integer
maxExecutors:
format: int32
type: integer
minExecutors:
format: int32
type: integer
shuffleTrackingTimeout:
format: int64
type: integer
type: object
executor:
properties:
affinity:

View File

@ -1786,6 +1786,23 @@ spec:
type: object
type: array
type: object
dynamicAllocation:
properties:
enabled:
type: boolean
initialExecutors:
format: int32
type: integer
maxExecutors:
format: int32
type: integer
minExecutors:
format: int32
type: integer
shuffleTrackingTimeout:
format: int64
type: integer
type: object
executor:
properties:
affinity:

View File

@ -276,6 +276,10 @@ type SparkApplicationSpec struct {
// SparkUIOptions allows configuring the Service and the Ingress to expose the sparkUI
// +optional
SparkUIOptions *SparkUIConfiguration `json:"sparkUIOptions,omitempty"`
// DynamicAllocation configures dynamic allocation that becomes available for the Kubernetes
// scheduleer backend since Spark 3.0.
// +optional
DynamicAllocation *DynamicAllocation `json:"dynamicAllocation,omitempty"`
}
// BatchSchedulerConfiguration used to configure how to batch scheduling Spark Application
@ -288,7 +292,7 @@ type BatchSchedulerConfiguration struct {
PriorityClassName *string `json:"priorityClassName,omitempty"`
}
// Specific SparkUI config parameters
// SparkUIConfiguration is for driver UI specific configuration parameters.
type SparkUIConfiguration struct {
// ServicePort allows configuring the port at service level that might be different from the targetPort.
// TargetPort should be the same as the one defined in spark.ui.port
@ -631,6 +635,26 @@ type GPUSpec struct {
Quantity int64 `json:"quantity"`
}
// DynamicAllocation contains configuration options for dynamic allocation.
type DynamicAllocation struct {
// Enabled controls whether dynamic allocation is enabled or not.
Enabled bool `json:"enabled,omitempty"`
// InitialExecutors is the initial number of executors to request. If .spec.executor.instances
// is also set, the initial number of executors is set to the bigger of that and this option.
// +optional
InitialExecutors *int32 `json:"initialExecutors,omitempty"`
// MinExecutors is the lower bound for the number of executors if dynamic allocation is enabled.
// +optional
MinExecutors *int32 `json:"minExecutors,omitempty"`
// MaxExecutors is the upper bound for the number of executors if dynamic allocation is enabled.
// +optional
MaxExecutors *int32 `json:"maxExecutors,omitempty"`
// ShuffleTrackingTimeout controls the timeout in milliseconds for executors that are holding
// shuffle data if shuffle tracking is enabled (true by default if dynamic allocation is enabled).
// +optional
ShuffleTrackingTimeout *int64 `json:"shuffleTrackingTimeout,omitempty"`
}
// PrometheusMonitoringEnabled returns if Prometheus monitoring is enabled or not.
func (s *SparkApplication) PrometheusMonitoringEnabled() bool {
return s.Spec.Monitoring != nil && s.Spec.Monitoring.Prometheus != nil

View File

@ -166,6 +166,42 @@ func (in *DriverSpec) DeepCopy() *DriverSpec {
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *DynamicAllocation) DeepCopyInto(out *DynamicAllocation) {
*out = *in
if in.InitialExecutors != nil {
in, out := &in.InitialExecutors, &out.InitialExecutors
*out = new(int32)
**out = **in
}
if in.MinExecutors != nil {
in, out := &in.MinExecutors, &out.MinExecutors
*out = new(int32)
**out = **in
}
if in.MaxExecutors != nil {
in, out := &in.MaxExecutors, &out.MaxExecutors
*out = new(int32)
**out = **in
}
if in.ShuffleTrackingTimeout != nil {
in, out := &in.ShuffleTrackingTimeout, &out.ShuffleTrackingTimeout
*out = new(int64)
**out = **in
}
return
}
// DeepCopy is an autogenerated deepcopy function, copying the receiver, creating a new DynamicAllocation.
func (in *DynamicAllocation) DeepCopy() *DynamicAllocation {
if in == nil {
return nil
}
out := new(DynamicAllocation)
in.DeepCopyInto(out)
return out
}
// DeepCopyInto is an autogenerated deepcopy function, copying the receiver, writing into out. in must be non-nil.
func (in *ExecutorSpec) DeepCopyInto(out *ExecutorSpec) {
*out = *in
@ -667,6 +703,11 @@ func (in *SparkApplicationSpec) DeepCopyInto(out *SparkApplicationSpec) {
*out = new(SparkUIConfiguration)
(*in).DeepCopyInto(*out)
}
if in.DynamicAllocation != nil {
in, out := &in.DynamicAllocation, &out.DynamicAllocation
*out = new(DynamicAllocation)
(*in).DeepCopyInto(*out)
}
return
}

View File

@ -148,6 +148,24 @@ const (
SparkDriverKubernetesMaster = "spark.kubernetes.driver.master"
// SparkDriverServiceAnnotationKeyPrefix is the key prefix of annotations to be added to the driver service.
SparkDriverServiceAnnotationKeyPrefix = "spark.kubernetes.driver.service.annotation."
// SparkDynamicAllocationEnabled is the Spark configuration key for specifying if dynamic
// allocation is enabled or not.
SparkDynamicAllocationEnabled = "spark.dynamicAllocation.enabled"
// SparkDynamicAllocationShuffleTrackingEnabled is the Spark configuration key for
// specifying if shuffle data tracking is enabled.
SparkDynamicAllocationShuffleTrackingEnabled = "spark.dynamicAllocation.shuffleTracking.enabled"
// SparkDynamicAllocationShuffleTrackingTimeout is the Spark configuration key for specifying
// the shuffle tracking timeout in milliseconds if shuffle tracking is enabled.
SparkDynamicAllocationShuffleTrackingTimeout = "spark.dynamicAllocation.shuffleTracking.timeout"
// SparkDynamicAllocationInitialExecutors is the Spark configuration key for specifying
// the initial number of executors to request if dynamic allocation is enabled.
SparkDynamicAllocationInitialExecutors = "spark.dynamicAllocation.initialExecutors"
// SparkDynamicAllocationMinExecutors is the Spark configuration key for specifying the
// lower bound of the number of executors to request if dynamic allocation is enabled.
SparkDynamicAllocationMinExecutors = "spark.dynamicAllocation.minExecutors"
// SparkDynamicAllocationMaxExecutors is the Spark configuration key for specifying the
// upper bound of the number of executors to request if dynamic allocation is enabled.
SparkDynamicAllocationMaxExecutors = "spark.dynamicAllocation.maxExecutors"
)
const (

View File

@ -140,11 +140,6 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
args = append(args, "--conf", fmt.Sprintf("spark.hadoop.%s=%s", key, value))
}
for key, value := range app.Spec.NodeSelector {
conf := fmt.Sprintf("%s%s=%s", config.SparkNodeSelectorKeyPrefix, key, value)
args = append(args, "--conf", conf)
}
// Add the driver and executor configuration options.
// Note that when the controller submits the application, it expects that all dependencies are local
// so init-container is not needed and therefore no init-container image needs to be specified.
@ -163,6 +158,16 @@ func buildSubmissionCommandArgs(app *v1beta2.SparkApplication, driverPodName str
args = append(args, "--conf", option)
}
options = addDynamicAllocationConfOptions(app)
for _, option := range options {
args = append(args, "--conf", option)
}
for key, value := range app.Spec.NodeSelector {
conf := fmt.Sprintf("%s%s=%s", config.SparkNodeSelectorKeyPrefix, key, value)
args = append(args, "--conf", conf)
}
if app.Spec.Volumes != nil {
options, err = addLocalDirConfOptions(app)
if err != nil {
@ -393,6 +398,36 @@ func addExecutorConfOptions(app *v1beta2.SparkApplication, submissionID string)
return executorConfOptions, nil
}
func addDynamicAllocationConfOptions(app *v1beta2.SparkApplication) []string {
if app.Spec.DynamicAllocation == nil {
return nil
}
dynamicAllocation := app.Spec.DynamicAllocation
if !dynamicAllocation.Enabled {
return nil
}
var options []string
options = append(options, fmt.Sprintf("%s=true", config.SparkDynamicAllocationEnabled))
// Turn on shuffle tracking if dynamic allocation is enabled.
options = append(options, fmt.Sprintf("%s=true", config.SparkDynamicAllocationShuffleTrackingEnabled))
if dynamicAllocation.InitialExecutors != nil {
options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationInitialExecutors, *dynamicAllocation.InitialExecutors))
}
if dynamicAllocation.MinExecutors != nil {
options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationMinExecutors, *dynamicAllocation.MinExecutors))
}
if dynamicAllocation.MaxExecutors != nil {
options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationMaxExecutors, *dynamicAllocation.MaxExecutors))
}
if dynamicAllocation.ShuffleTrackingTimeout != nil {
options = append(options, fmt.Sprintf("%s=%d", config.SparkDynamicAllocationShuffleTrackingTimeout, *dynamicAllocation.ShuffleTrackingTimeout))
}
return options
}
// addLocalDirConfOptions excludes local dir volumes, update SparkApplication and returns local dir config options
func addLocalDirConfOptions(app *v1beta2.SparkApplication) ([]string, error) {
var localDirConfOptions []string

View File

@ -18,18 +18,20 @@ package sparkapplication
import (
"fmt"
"github.com/google/uuid"
"reflect"
"sort"
"strconv"
"testing"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2"
"github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/config"
)
const (
@ -504,3 +506,40 @@ func TestPopulateLabelsOverride_Driver_Executor(t *testing.T) {
t.Errorf("Executor labels: wanted %+q got %+q", expectedExecutorLabels, executorOptions)
}
}
func TestDynamicAllocationOptions(t *testing.T) {
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
},
Spec: v1beta2.SparkApplicationSpec{},
}
options := addDynamicAllocationConfOptions(app)
assert.Equal(t, 0, len(options))
app = &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
},
Spec: v1beta2.SparkApplicationSpec{
DynamicAllocation: &v1beta2.DynamicAllocation{
Enabled: true,
InitialExecutors: int32ptr(2),
MinExecutors: int32ptr(0),
MaxExecutors: int32ptr(10),
ShuffleTrackingTimeout: int64ptr(6000000),
},
},
}
options = addDynamicAllocationConfOptions(app)
assert.Equal(t, 6, len(options))
assert.Equal(t, fmt.Sprintf("%s=true", config.SparkDynamicAllocationEnabled), options[0])
assert.Equal(t, fmt.Sprintf("%s=true", config.SparkDynamicAllocationShuffleTrackingEnabled), options[1])
assert.Equal(t, fmt.Sprintf("%s=2", config.SparkDynamicAllocationInitialExecutors), options[2])
assert.Equal(t, fmt.Sprintf("%s=0", config.SparkDynamicAllocationMinExecutors), options[3])
assert.Equal(t, fmt.Sprintf("%s=10", config.SparkDynamicAllocationMaxExecutors), options[4])
assert.Equal(t, fmt.Sprintf("%s=6000000", config.SparkDynamicAllocationShuffleTrackingTimeout), options[5])
}