diff --git a/docs/user-guide.md b/docs/user-guide.md index 3028cf43..530f5376 100644 --- a/docs/user-guide.md +++ b/docs/user-guide.md @@ -627,7 +627,7 @@ Below is an example that shows how to configure the metric system to expose metr ```yaml spec: - dep: + deps: jars: - http://central.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.11.0/jmx_prometheus_javaagent-0.11.0.jar monitoring: @@ -689,7 +689,7 @@ spec: timeToLiveSeconds: 3600 ``` -Note that this feature requires that informer cache resync to be enabled, which is true by default with a resync internal of 30 seconds. You can change the resync interval by setting the flag `-resync-interval=`. +Note that this feature requires that informer cache resync to be enabled, which is true by default with a resync internal of 30 seconds. You can change the resync interval by setting the flag `-resync-interval=`. ## Running Spark Applications on a Schedule using a ScheduledSparkApplication diff --git a/pkg/batchscheduler/interface/interface.go b/pkg/batchscheduler/interface/interface.go index 0d0d3fd9..06f09c8c 100644 --- a/pkg/batchscheduler/interface/interface.go +++ b/pkg/batchscheduler/interface/interface.go @@ -24,5 +24,5 @@ type BatchScheduler interface { Name() string ShouldSchedule(app *v1beta2.SparkApplication) bool - DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) + DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error } diff --git a/pkg/batchscheduler/volcano/volcano_scheduler.go b/pkg/batchscheduler/volcano/volcano_scheduler.go index 1832a8cd..e8c9023f 100644 --- a/pkg/batchscheduler/volcano/volcano_scheduler.go +++ b/pkg/batchscheduler/volcano/volcano_scheduler.go @@ -18,18 +18,19 @@ package volcano import ( "fmt" + corev1 "k8s.io/api/core/v1" apiextensionsclient "k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset" "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/resource" - "k8s.io/apimachinery/pkg/apis/meta/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/rest" "volcano.sh/volcano/pkg/apis/scheduling/v1alpha2" volcanoclient "volcano.sh/volcano/pkg/client/clientset/versioned" "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/apis/sparkoperator.k8s.io/v1beta2" - "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface" + schedulerinterface "github.com/GoogleCloudPlatform/spark-on-k8s-operator/pkg/batchscheduler/interface" ) const ( @@ -54,39 +55,36 @@ func (v *VolcanoBatchScheduler) ShouldSchedule(app *v1beta2.SparkApplication) bo return true } -func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { - newApp := app.DeepCopy() - if newApp.Spec.Executor.Annotations == nil { - newApp.Spec.Executor.Annotations = make(map[string]string) +func (v *VolcanoBatchScheduler) DoBatchSchedulingOnSubmission(app *v1beta2.SparkApplication) error { + if app.Spec.Executor.Annotations == nil { + app.Spec.Executor.Annotations = make(map[string]string) } - if newApp.Spec.Driver.Annotations == nil { - newApp.Spec.Driver.Annotations = make(map[string]string) + if app.Spec.Driver.Annotations == nil { + app.Spec.Driver.Annotations = make(map[string]string) } - if newApp.Spec.Mode == v1beta2.ClientMode { - return v.syncPodGroupInClientMode(newApp) - } else if newApp.Spec.Mode == v1beta2.ClusterMode { - return v.syncPodGroupInClusterMode(newApp) + if app.Spec.Mode == v1beta2.ClientMode { + return v.syncPodGroupInClientMode(app) + } else if app.Spec.Mode == v1beta2.ClusterMode { + return v.syncPodGroupInClusterMode(app) } - return newApp, nil + return nil } -func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { - //We only care about the executor pods in client mode - newApp := app.DeepCopy() - if _, ok := newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { - //Only executor resource will be considered. - if err := v.syncPodGroup(newApp, 1, getExecutorRequestResource(app)); err == nil { - newApp.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(newApp) +func (v *VolcanoBatchScheduler) syncPodGroupInClientMode(app *v1beta2.SparkApplication) error { + // We only care about the executor pods in client mode + if _, ok := app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { + if err := v.syncPodGroup(app, 1, getExecutorRequestResource(app)); err == nil { + app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) } else { - return nil, err + return err } } - return newApp, nil + return nil } -func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) (*v1beta2.SparkApplication, error) { +func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkApplication) error { //We need both mark Driver and Executor when submitting //NOTE: In cluster mode, the initial size of PodGroup is set to 1 in order to schedule driver pod first. if _, ok := app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey]; !ok { @@ -96,10 +94,10 @@ func (v *VolcanoBatchScheduler) syncPodGroupInClusterMode(app *v1beta2.SparkAppl app.Spec.Executor.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) app.Spec.Driver.Annotations[v1alpha2.GroupNameAnnotationKey] = v.getAppPodGroupName(app) } else { - return nil, err + return err } } - return app, nil + return nil } func (v *VolcanoBatchScheduler) getAppPodGroupName(app *v1beta2.SparkApplication) string { @@ -109,16 +107,16 @@ func (v *VolcanoBatchScheduler) getAppPodGroupName(app *v1beta2.SparkApplication func (v *VolcanoBatchScheduler) syncPodGroup(app *v1beta2.SparkApplication, size int32, minResource corev1.ResourceList) error { var err error podGroupName := v.getAppPodGroupName(app) - if pg, err := v.volcanoClient.SchedulingV1alpha2().PodGroups(app.Namespace).Get(podGroupName, v1.GetOptions{}); err != nil { + if pg, err := v.volcanoClient.SchedulingV1alpha2().PodGroups(app.Namespace).Get(podGroupName, metav1.GetOptions{}); err != nil { if !errors.IsNotFound(err) { return err } podGroup := v1alpha2.PodGroup{ - ObjectMeta: v1.ObjectMeta{ + ObjectMeta: metav1.ObjectMeta{ Namespace: app.Namespace, Name: podGroupName, - OwnerReferences: []v1.OwnerReference{ - *v1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")), + OwnerReferences: []metav1.OwnerReference{ + *metav1.NewControllerRef(app, v1beta2.SchemeGroupVersion.WithKind("SparkApplication")), }, }, Spec: v1alpha2.PodGroupSpec{ @@ -164,7 +162,7 @@ func New(config *rest.Config) (schedulerinterface.BatchScheduler, error) { } if _, err := extClient.ApiextensionsV1beta1().CustomResourceDefinitions().Get( - PodGroupName, v1.GetOptions{}); err != nil { + PodGroupName, metav1.GetOptions{}); err != nil { return nil, fmt.Errorf("podGroup CRD is required to exists in current cluster error: %s", err) } return &VolcanoBatchScheduler{ diff --git a/pkg/controller/sparkapplication/controller.go b/pkg/controller/sparkapplication/controller.go index 7f74fba9..68fe2d30 100644 --- a/pkg/controller/sparkapplication/controller.go +++ b/pkg/controller/sparkapplication/controller.go @@ -175,7 +175,6 @@ func (c *Controller) Stop() { // Callback function called when a new SparkApplication object gets created. func (c *Controller) onAdd(obj interface{}) { app := obj.(*v1beta2.SparkApplication) - v1beta2.SetSparkApplicationDefaults(app) glog.Infof("SparkApplication %s/%s was added, enqueueing it for submission", app.Namespace, app.Name) c.enqueue(app) } @@ -618,6 +617,9 @@ func hasRetryIntervalPassed(retryInterval *int64, attemptsDone int32, lastEventT // submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit. func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1beta2.SparkApplication { + // Apply default values before submitting the application to run. + v1beta2.SetSparkApplicationDefaults(app) + if app.PrometheusMonitoringEnabled() { if err := configPrometheusMonitoring(app, c.kubeClient); err != nil { glog.Error(err) @@ -626,13 +628,11 @@ func (c *Controller) submitSparkApplication(app *v1beta2.SparkApplication) *v1be // Use batch scheduler to perform scheduling task before submitting (before build command arguments). if needScheduling, scheduler := c.shouldDoBatchScheduling(app); needScheduling { - newApp, err := scheduler.DoBatchSchedulingOnSubmission(app) + err := scheduler.DoBatchSchedulingOnSubmission(app) if err != nil { glog.Errorf("failed to process batch scheduler BeforeSubmitSparkApplication with error %v", err) return app } - //Spark submit will use the updated app to submit tasks(Spec will not be updated into API server) - app = newApp } driverPodName := getDriverPodName(app)