Added support for specifying init-containers for driver/executors (#740)

This commit is contained in:
Yinan Li 2019-12-16 16:15:52 -08:00 committed by GitHub
parent bb65e6b535
commit 86fb3bd51d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 228 additions and 66 deletions

View File

@ -27,6 +27,7 @@ The Kubernetes Operator for Apache Spark ships with a command-line tool called `
* [Using Tolerations](#using-tolerations)
* [Using Pod Security Context](#using-pod-security-context)
* [Using Sidecar Containers](#using-sidecar-containers)
* [Using Init-Containers](#using-init-Containers)
* [Using Volume For Scratch Space](#using-volume-for-scratch-space)
* [Python Support](#python-support)
* [Monitoring](#monitoring)
@ -397,6 +398,27 @@ spec:
...
```
### Using Init-Containers
A `SparkApplication` can soptionally pecify one or more [init-containers](https://kubernetes.io/docs/concepts/workloads/pods/init-containers/) for the driver or executor pod, using the optional field `.spec.driver.initContainers` or `.spec.executor.initContainers`, respectively. The specification of each init-container follows the [Container](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.14/#container-v1-core) API definition. Below is an example:
```yaml
spec:
driver:
initContainers:
- name: "init-container1"
image: "init-container1:latest"
...
executor:
initContainers:
- name: "init-container1"
image: "init-container1:latest"
...
```
Note that the mutating admission webhook is needed to use this feature. Please refer to the
[Quick Start Guide](quick-start-guide.md) on how to enable the mutating admission webhook.
### Using DNS Settings
A `SparkApplication` can define DNS settings for the driver and/or executor pod, by adding the standart [DNS](https://kubernetes.io/docs/concepts/services-networking/dns-pod-service/#pod-s-dns-config) kubernetes settings. Fields to add such configuration are `.spec.driver.dnsConfig` and `.spec.executor.dnsConfig`. Example:
@ -418,7 +440,6 @@ spec:
Note that the mutating admission webhook is needed to use this feature. Please refer to the
[Quick Start Guide](quick-start-guide.md) on how to enable the mutating admission webhook.
### Using Volume For Scratch Space
By default, Spark uses temporary scratch space to spill data to disk during shuffles and other operations.
In order to use volume, volume's name should starts with `spark-local-dir-`.

View File

@ -102,17 +102,17 @@ type ScheduledSparkApplicationSpec struct {
// Template is a template from which SparkApplication instances can be created.
Template SparkApplicationSpec `json:"template"`
// Suspend is a flag telling the controller to suspend subsequent runs of the application if set to true.
// Optional.
// +optional
// Defaults to false.
Suspend *bool `json:"suspend,omitempty"`
// ConcurrencyPolicy is the policy governing concurrent SparkApplication runs.
ConcurrencyPolicy ConcurrencyPolicy `json:"concurrencyPolicy,omitempty"`
// SuccessfulRunHistoryLimit is the number of past successful runs of the application to keep.
// Optional.
// +optional
// Defaults to 1.
SuccessfulRunHistoryLimit *int32 `json:"successfulRunHistoryLimit,omitempty"`
// FailedRunHistoryLimit is the number of past failed runs of the application to keep.
// Optional.
// +optional
// Defaults to 1.
FailedRunHistoryLimit *int32 `json:"failedRunHistoryLimit,omitempty"`
}
@ -177,102 +177,103 @@ type SparkApplicationSpec struct {
Mode DeployMode `json:"mode,omitempty"`
// Image is the container image for the driver, executor, and init-container. Any custom container images for the
// driver, executor, or init-container takes precedence over this.
// Optional.
// +optional
Image *string `json:"image,omitempty"`
// InitContainerImage is the image of the init-container to use. Overrides Spec.Image if set.
// Optional.
// +optional
// +deprecated
InitContainerImage *string `json:"initContainerImage,omitempty"`
// ImagePullPolicy is the image pull policy for the driver, executor, and init-container.
// Optional.
// +optional
ImagePullPolicy *string `json:"imagePullPolicy,omitempty"`
// ImagePullSecrets is the list of image-pull secrets.
// Optional.
// +optional
ImagePullSecrets []string `json:"imagePullSecrets,omitempty"`
// MainClass is the fully-qualified main class of the Spark application.
// This only applies to Java/Scala Spark applications.
// Optional.
// +optional
MainClass *string `json:"mainClass,omitempty"`
// MainFile is the path to a bundled JAR, Python, or R file of the application.
// Optional.
// +optional
MainApplicationFile *string `json:"mainApplicationFile"`
// Arguments is a list of arguments to be passed to the application.
// Optional.
// +optional
Arguments []string `json:"arguments,omitempty"`
// SparkConf carries user-specified Spark configuration properties as they would use the "--conf" option in
// spark-submit.
// Optional.
// +optional
SparkConf map[string]string `json:"sparkConf,omitempty"`
// HadoopConf carries user-specified Hadoop configuration properties as they would use the the "--conf" option
// in spark-submit. The SparkApplication controller automatically adds prefix "spark.hadoop." to Hadoop
// configuration properties.
// Optional.
// +optional
HadoopConf map[string]string `json:"hadoopConf,omitempty"`
// SparkConfigMap carries the name of the ConfigMap containing Spark configuration files such as log4j.properties.
// The controller will add environment variable SPARK_CONF_DIR to the path where the ConfigMap is mounted to.
// Optional.
// +optional
SparkConfigMap *string `json:"sparkConfigMap,omitempty"`
// HadoopConfigMap carries the name of the ConfigMap containing Hadoop configuration files such as core-site.xml.
// The controller will add environment variable HADOOP_CONF_DIR to the path where the ConfigMap is mounted to.
// Optional.
// +optional
HadoopConfigMap *string `json:"hadoopConfigMap,omitempty"`
// Volumes is the list of Kubernetes volumes that can be mounted by the driver and/or executors.
// Optional.
// +optional
Volumes []apiv1.Volume `json:"volumes,omitempty"`
// Driver is the driver specification.
Driver DriverSpec `json:"driver"`
// Executor is the executor specification.
Executor ExecutorSpec `json:"executor"`
// Deps captures all possible types of dependencies of a Spark application.
// Optional.
// +optional
Deps Dependencies `json:"deps,omitempty"`
// RestartPolicy defines the policy on if and in which conditions the controller should restart an application.
RestartPolicy RestartPolicy `json:"restartPolicy,omitempty"`
// NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
// This field is mutually exclusive with nodeSelector at podSpec level (driver or executor).
// This field will be deprecated in future versions (at SparkApplicationSpec level).
// Optional.
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// FailureRetries is the number of times to retry a failed application before giving up.
// This is best effort and actual retry attempts can be >= the value specified.
// Optional.
// +optional
FailureRetries *int32 `json:"failureRetries,omitempty"`
// RetryInterval is the unit of intervals in seconds between submission retries.
// Optional.
// +optional
RetryInterval *int64 `json:"retryInterval,omitempty"`
// This sets the major Python version of the docker
// image used to run the driver and executor containers. Can either be 2 or 3, default 2.
// Optional.
// +optional
// +kubebuilder:validation:Enum={"2","3"}
PythonVersion *string `json:"pythonVersion,omitempty"`
// This sets the Memory Overhead Factor that will allocate memory to non-JVM memory.
// For JVM-based jobs this value will default to 0.10, for non-JVM jobs 0.40. Value of this field will
// be overridden by `Spec.Driver.MemoryOverhead` and `Spec.Executor.MemoryOverhead` if they are set.
// Optional.
// +optional
MemoryOverheadFactor *string `json:"memoryOverheadFactor,omitempty"`
// Monitoring configures how monitoring is handled.
// Optional.
// +optional
Monitoring *MonitoringSpec `json:"monitoring,omitempty"`
// BatchScheduler configures which batch scheduler will be used for scheduling
// Optional.
// +optional
BatchScheduler *string `json:"batchScheduler,omitempty"`
// TimeToLiveSeconds defines the Time-To-Live (TTL) duration in seconds for this SparkAplication
// after its termination.
// The SparkApplication object will be garbage collected if the current time is more than the
// TimeToLiveSeconds since its termination.
// Optional.
// +optional
TimeToLiveSeconds *int64 `json:"timeToLiveSeconds,omitempty"`
// BatchSchedulerOptions provides fine-grained control on how to batch scheduling.
// Optional.
// +optional
BatchSchedulerOptions *BatchSchedulerConfiguration `json:"batchSchedulerOptions,omitempty"`
}
// BatchSchedulerConfiguration used to configure how to batch scheduling Spark Application
type BatchSchedulerConfiguration struct {
// Queue stands for the resource queue which the application belongs to, it's being used in Volcano batch scheduler.
// Optional.
// +optional
Queue *string `json:"queue,omitempty"`
// PriorityClassName stands for the name of k8s PriorityClass resource, it's being used in Volcano batch scheduler.
// Optional.
// +optional
PriorityClassName *string `json:"priorityClassName,omitempty"`
}
@ -348,24 +349,32 @@ type SparkApplicationList struct {
// Dependencies specifies all possible types of dependencies of a Spark application.
type Dependencies struct {
// Jars is a list of JAR files the Spark application depends on.
// Optional.
// +optional
Jars []string `json:"jars,omitempty"`
// Files is a list of files the Spark application depends on.
// Optional.
// +optional
Files []string `json:"files,omitempty"`
// PyFiles is a list of Python files the Spark application depends on.
// Optional.
// +optional
PyFiles []string `json:"pyFiles,omitempty"`
// JarsDownloadDir is the location to download jars to in the driver and executors.
// +optional
// +deprecated
JarsDownloadDir *string `json:"jarsDownloadDir,omitempty"`
// FilesDownloadDir is the location to download files to in the driver and executors.
// +optional
// +deprecated
FilesDownloadDir *string `json:"filesDownloadDir,omitempty"`
// DownloadTimeout specifies the timeout in seconds before aborting the attempt to download
// and unpack dependencies from remote locations into the driver and executor pods.
// +optional
// +deprecated
// +kubebuilder:validation:Minimum=1
DownloadTimeout *int32 `json:"downloadTimeout,omitempty"`
// MaxSimultaneousDownloads specifies the maximum number of remote dependencies to download
// simultaneously in a driver or executor pod.
// +optional
// +deprecated
// +kubebuilder:validation:Minimum=1
MaxSimultaneousDownloads *int32 `json:"maxSimultaneousDownloads,omitempty"`
}
@ -374,74 +383,77 @@ type Dependencies struct {
// TODO: investigate if we should use v1.PodSpec and limit what can be set instead.
type SparkPodSpec struct {
// Cores is the number of CPU cores to request for the pod.
// Optional.
// +optional
// +kubebuilder:validation:Minimum=1
Cores *int32 `json:"cores,omitempty"`
// CoreLimit specifies a hard limit on CPU cores for the pod.
// Optional
CoreLimit *string `json:"coreLimit,omitempty"`
// Memory is the amount of memory to request for the pod.
// Optional.
// +optional
Memory *string `json:"memory,omitempty"`
// MemoryOverhead is the amount of off-heap memory to allocate in cluster mode, in MiB unless otherwise specified.
// Optional.
// +optional
MemoryOverhead *string `json:"memoryOverhead,omitempty"`
// GPU specifies GPU requirement for the pod.
// Optional.
// +optional
GPU *GPUSpec `json:"gpu,omitempty"`
// Image is the container image to use. Overrides Spec.Image if set.
// Optional.
// +optional
Image *string `json:"image,omitempty"`
// ConfigMaps carries information of other ConfigMaps to add to the pod.
// Optional.
// +optional
ConfigMaps []NamePath `json:"configMaps,omitempty"`
// Secrets carries information of secrets to add to the pod.
// Optional.
// +optional
Secrets []SecretInfo `json:"secrets,omitempty"`
// Env carries the environment variables to add to the pod.
// Optional.
// +optional
Env []apiv1.EnvVar `json:"env,omitempty"`
// EnvVars carries the environment variables to add to the pod.
// Optional.
// +optional
// DEPRECATED.
EnvVars map[string]string `json:"envVars,omitempty"`
// EnvSecretKeyRefs holds a mapping from environment variable names to SecretKeyRefs.
// Optional.
// +optional
// DEPRECATED.
EnvSecretKeyRefs map[string]NameKey `json:"envSecretKeyRefs,omitempty"`
// Labels are the Kubernetes labels to be added to the pod.
// Optional.
// +optional
Labels map[string]string `json:"labels,omitempty"`
// Annotations are the Kubernetes annotations to be added to the pod.
// Optional.
// +optional
Annotations map[string]string `json:"annotations,omitempty"`
// VolumeMounts specifies the volumes listed in ".spec.volumes" to mount into the main container's filesystem.
// Optional.
// +optional
VolumeMounts []apiv1.VolumeMount `json:"volumeMounts,omitempty"`
// Affinity specifies the affinity/anti-affinity settings for the pod.
// Optional.
// +optional
Affinity *apiv1.Affinity `json:"affinity,omitempty"`
// Tolerations specifies the tolerations listed in ".spec.tolerations" to be applied to the pod.
// Optional.
// +optional
Tolerations []apiv1.Toleration `json:"tolerations,omitempty"`
// SecurityContenxt specifies the PodSecurityContext to apply.
// Optional.
// +optional
SecurityContenxt *apiv1.PodSecurityContext `json:"securityContext,omitempty"`
// SchedulerName specifies the scheduler that will be used for scheduling
// Optional.
// +optional
SchedulerName *string `json:"schedulerName,omitempty"`
// Sidecars is a list of sidecar containers that run along side the main Spark container.
// Optional.
// +optional
Sidecars []apiv1.Container `json:"sidecars,omitempty"`
// InitContainers is a list of init-containers that run to completion before the main Spark container.
// +optional
InitContainers []apiv1.Container `json:"initContainers,omitempty"`
// HostNetwork indicates whether to request host networking for the pod or not.
// Optional.
// +optional
HostNetwork *bool `json:"hostNetwork,omitempty"`
// NodeSelector is the Kubernetes node selector to be added to the driver and executor pods.
// This field is mutually exclusive with nodeSelector at SparkApplication level (which will be deprecated).
// Optional.
// +optional
NodeSelector map[string]string `json:"nodeSelector,omitempty"`
// DnsConfig dns settings for the pod, following the Kubernetes specifications.
// Optional.
// +optional
DNSConfig *apiv1.PodDNSConfig `json:"dnsConfig,omitempty"`
}
@ -452,7 +464,7 @@ type DriverSpec struct {
// in-cluster client mode in which the user creates a client pod where the driver of
// the user application runs. It's an error to set this field if Mode is not
// in-cluster-client.
// Optional.
// +optional
// +kubebuilder:validation:Pattern=[a-z0-9]([-a-z0-9]*[a-z0-9])?(\\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*
PodName *string `json:"podName,omitempty"`
// ServiceAccount is the name of the Kubernetes service account used by the driver pod
@ -467,11 +479,11 @@ type DriverSpec struct {
type ExecutorSpec struct {
SparkPodSpec `json:",inline"`
// Instances is the number of executor instances.
// Optional.
// +optional
// +kubebuilder:validation:Minimum=1
Instances *int32 `json:"instances,omitempty"`
// CoreRequest is the physical CPU core request for the executors.
// Optional.
// +optional
CoreRequest *string `json:"coreRequest,omitempty"`
// JavaOptions is a string of extra JVM options to pass to the executors. For instance,
// GC settings or other logging.
@ -534,11 +546,11 @@ type MonitoringSpec struct {
// ExposeExecutorMetrics specifies whether to expose metrics on the executors.
ExposeExecutorMetrics bool `json:"exposeExecutorMetrics"`
// MetricsProperties is the content of a custom metrics.properties for configuring the Spark metric system.
// Optional.
// +optional
// If not specified, the content in spark-docker/conf/metrics.properties will be used.
MetricsProperties *string `json:"metricsProperties,omitempty"`
// Prometheus is for configuring the Prometheus JMX exporter.
// Optional.
// +optional
Prometheus *PrometheusSpec `json:"prometheus,omitempty"`
}
@ -548,7 +560,7 @@ type PrometheusSpec struct {
// JmxExporterJar is the path to the Prometheus JMX exporter jar in the container.
JmxExporterJar string `json:"jmxExporterJar"`
// Port is the port of the HTTP server run by the Prometheus JMX exporter.
// Optional.
// +optional
// If not specified, 8090 will be used as the default.
// +kubebuilder:validation:Minimum=1024
// +kubebuilder:validation:Maximum=49151
@ -557,7 +569,7 @@ type PrometheusSpec struct {
// ConfigFile takes precedence over Configuration, which is shown below.
ConfigFile *string `json:"configFile,omitempty"`
// Configuration is the content of the Prometheus configuration needed by the Prometheus JMX exporter.
// Optional.
// +optional
// If not specified, the content in spark-docker/conf/prometheus.yaml will be used.
// Configuration has no effect if ConfigFile is set.
Configuration *string `json:"configuration,omitempty"`

View File

@ -56,6 +56,7 @@ func patchSparkPod(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperat
patchOps = append(patchOps, addPrometheusConfigMap(pod, app)...)
patchOps = append(patchOps, addTolerations(pod, app)...)
patchOps = append(patchOps, addSidecarContainers(pod, app)...)
patchOps = append(patchOps, addInitContainers(pod, app)...)
patchOps = append(patchOps, addHostNetwork(pod, app)...)
patchOps = append(patchOps, addNodeSelectors(pod, app)...)
patchOps = append(patchOps, addDNSConfig(pod, app)...)
@ -453,6 +454,34 @@ func addSidecarContainers(pod *corev1.Pod, app *v1beta2.SparkApplication) []patc
return ops
}
func addInitContainers(pod *corev1.Pod, app *v1beta2.SparkApplication) []patchOperation {
var initContainers []corev1.Container
if util.IsDriverPod(pod) {
initContainers = app.Spec.Driver.InitContainers
} else if util.IsExecutorPod(pod) {
initContainers = app.Spec.Executor.InitContainers
}
first := false
if len(pod.Spec.InitContainers) == 0 {
first = true
}
var ops []patchOperation
for _, c := range initContainers {
sd := c
if first {
first = false
value := []corev1.Container{sd}
ops = append(ops, patchOperation{Op: "add", Path: "/spec/initContainers", Value: value})
} else if !hasInitContainer(pod, &sd) {
ops = append(ops, patchOperation{Op: "add", Path: "/spec/initContainers/-", Value: &sd})
}
}
return ops
}
func addGPU(pod *corev1.Pod, app *v1beta2.SparkApplication) *patchOperation {
var gpu *v1beta2.GPUSpec
if util.IsDriverPod(pod) {
@ -522,3 +551,12 @@ func hasContainer(pod *corev1.Pod, container *corev1.Container) bool {
}
return false
}
func hasInitContainer(pod *corev1.Pod, container *corev1.Container) bool {
for _, c := range pod.Spec.InitContainers {
if container.Name == c.Name && container.Image == c.Image {
return true
}
}
return false
}

View File

@ -774,6 +774,97 @@ func TestPatchSparkPod_Sidecars(t *testing.T) {
assert.Equal(t, "sidecar2", modifiedExecutorPod.Spec.Containers[2].Name)
}
func TestPatchSparkPod_InitContainers(t *testing.T) {
app := &v1beta2.SparkApplication{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-test",
UID: "spark-test-1",
},
Spec: v1beta2.SparkApplicationSpec{
Driver: v1beta2.DriverSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
InitContainers: []corev1.Container{
{
Name: "init-container1",
Image: "init-container1:latest",
},
{
Name: "init-container2",
Image: "init-container2:latest",
},
},
},
},
Executor: v1beta2.ExecutorSpec{
SparkPodSpec: v1beta2.SparkPodSpec{
InitContainers: []corev1.Container{
{
Name: "init-container1",
Image: "init-container1:latest",
},
{
Name: "init-container2",
Image: "init-container2:latest",
},
},
},
},
},
}
driverPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-driver",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkDriverRole,
config.LaunchedBySparkOperatorLabel: "true",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: config.SparkDriverContainerName,
Image: "spark-driver:latest",
},
},
},
}
modifiedDriverPod, err := getModifiedPod(driverPod, app)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(modifiedDriverPod.Spec.InitContainers))
assert.Equal(t, "init-container1", modifiedDriverPod.Spec.InitContainers[0].Name)
assert.Equal(t, "init-container2", modifiedDriverPod.Spec.InitContainers[1].Name)
executorPod := &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: "spark-executor",
Labels: map[string]string{
config.SparkRoleLabel: config.SparkExecutorRole,
config.LaunchedBySparkOperatorLabel: "true",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: config.SparkExecutorContainerName,
Image: "spark-executor:latest",
},
},
},
}
modifiedExecutorPod, err := getModifiedPod(executorPod, app)
if err != nil {
t.Fatal(err)
}
assert.Equal(t, 2, len(modifiedExecutorPod.Spec.InitContainers))
assert.Equal(t, "init-container1", modifiedExecutorPod.Spec.InitContainers[0].Name)
assert.Equal(t, "init-container2", modifiedExecutorPod.Spec.InitContainers[1].Name)
}
func TestPatchSparkPod_DNSConfig(t *testing.T) {
aVal := "5"
sampleDNSConfig := &corev1.PodDNSConfig{
@ -957,7 +1048,7 @@ func TestPatchSparkPod_GPU(t *testing.T) {
quantity := modifiedPod.Spec.Containers[0].Resources.Limits[corev1.ResourceName(test.gpuSpec.Name)]
count, succeed := (&quantity).AsInt64()
if succeed != true {
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision."))
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision"))
}
assert.Equal(t, test.gpuSpec.Quantity, count)
}
@ -967,7 +1058,7 @@ func TestPatchSparkPod_GPU(t *testing.T) {
quantity := modifiedPod.Spec.Containers[0].Resources.Requests[corev1.ResourceCPU]
count, succeed := (&quantity).AsInt64()
if succeed != true {
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision."))
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision"))
}
assert.Equal(t, *test.cpuRequests, count)
}
@ -977,7 +1068,7 @@ func TestPatchSparkPod_GPU(t *testing.T) {
quantity := modifiedPod.Spec.Containers[0].Resources.Limits[corev1.ResourceCPU]
count, succeed := (&quantity).AsInt64()
if succeed != true {
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision."))
t.Fatal(fmt.Errorf("value cannot be represented in an int64 OR would result in a loss of precision"))
}
assert.Equal(t, *test.cpuLimits, count)
}
@ -1038,22 +1129,22 @@ func TestPatchSparkPod_GPU(t *testing.T) {
&cpuRequest,
},
{
&v1beta2.GPUSpec{"example.com/gpu", 1},
&v1beta2.GPUSpec{Name: "example.com/gpu", Quantity: 1},
nil,
nil,
},
{
&v1beta2.GPUSpec{"example.com/gpu", 1},
&v1beta2.GPUSpec{Name: "example.com/gpu", Quantity: 1},
&cpuLimit,
nil,
},
{
&v1beta2.GPUSpec{"example.com/gpu", 1},
&v1beta2.GPUSpec{Name: "example.com/gpu", Quantity: 1},
nil,
&cpuRequest,
},
{
&v1beta2.GPUSpec{"example.com/gpu", 1},
&v1beta2.GPUSpec{Name: "example.com/gpu", Quantity: 1},
&cpuLimit,
&cpuRequest,
},