Define SparkApplicationSubmitter interface to allow customizing submitting mechanism

Signed-off-by: Yi Chen <github@chenyicn.net>
This commit is contained in:
Yi Chen 2025-04-09 17:32:30 +08:00
parent 3c4ebc7235
commit 89e9bae50f
4 changed files with 63 additions and 37 deletions

View File

@ -264,6 +264,8 @@ func start() {
}
}
sparkSubmitter := &sparkapplication.SparkSubmitter{}
// Setup controller for SparkApplication.
if err = sparkapplication.NewReconciler(
mgr,
@ -271,6 +273,7 @@ func start() {
mgr.GetClient(),
mgr.GetEventRecorderFor("spark-application-controller"),
registry,
sparkSubmitter,
newSparkApplicationReconcilerOptions(),
).SetupWithManager(mgr, newControllerOptions()); err != nil {
logger.Error(err, "Failed to create controller", "controller", "SparkApplication")

View File

@ -75,12 +75,13 @@ type Options struct {
// Reconciler reconciles a SparkApplication object.
type Reconciler struct {
manager ctrl.Manager
scheme *runtime.Scheme
client client.Client
recorder record.EventRecorder
options Options
registry *scheduler.Registry
manager ctrl.Manager
scheme *runtime.Scheme
client client.Client
recorder record.EventRecorder
registry *scheduler.Registry
submitter SparkApplicationSubmitter
options Options
}
// Reconciler implements reconcile.Reconciler.
@ -93,15 +94,17 @@ func NewReconciler(
client client.Client,
recorder record.EventRecorder,
registry *scheduler.Registry,
submitter SparkApplicationSubmitter,
options Options,
) *Reconciler {
return &Reconciler{
manager: manager,
scheme: scheme,
client: client,
recorder: recorder,
registry: registry,
options: options,
manager: manager,
scheme: scheme,
client: client,
recorder: recorder,
registry: registry,
submitter: submitter,
options: options,
}
}
@ -263,7 +266,7 @@ func (r *Reconciler) reconcileNewSparkApplication(ctx context.Context, req ctrl.
}
app := old.DeepCopy()
_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
}
@ -331,7 +334,7 @@ func (r *Reconciler) reconcileFailedSubmissionSparkApplication(ctx context.Conte
}
if timeUntilNextRetryDue <= 0 {
if r.validateSparkResourceDeletion(ctx, app) {
_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
} else {
if err := r.deleteSparkResources(ctx, app); err != nil {
logger.Error(err, "failed to delete resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace)
@ -412,7 +415,7 @@ func (r *Reconciler) reconcilePendingRerunSparkApplication(ctx context.Context,
logger.Info("Successfully deleted resources associated with SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
r.recordSparkApplicationEvent(app)
r.resetSparkApplicationStatus(app)
_ = r.submitSparkApplication(app)
_ = r.submitSparkApplication(ctx, app)
}
if err := r.updateSparkApplicationStatus(ctx, app); err != nil {
return err
@ -644,7 +647,7 @@ func (r *Reconciler) getSparkApplication(ctx context.Context, key types.Namespac
}
// submitSparkApplication creates a new submission for the given SparkApplication and submits it using spark-submit.
func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (submitErr error) {
func (r *Reconciler) submitSparkApplication(ctx context.Context, app *v1beta2.SparkApplication) (submitErr error) {
logger.Info("Submitting SparkApplication", "name", app.Name, "namespace", app.Namespace, "state", app.Status.AppState.State)
// SubmissionID must be set before creating any resources to ensure all the resources are labeled.
@ -747,17 +750,11 @@ func (r *Reconciler) submitSparkApplication(app *v1beta2.SparkApplication) (subm
}
}()
sparkSubmitArgs, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
if err := r.submitter.Submit(ctx, app); err != nil {
r.recordSparkApplicationEvent(app)
return fmt.Errorf("failed to submit spark application: %v", err)
}
// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit for SparkApplication", "name", app.Name, "namespace", app.Namespace, "arguments", sparkSubmitArgs)
if err := runSparkSubmit(newSubmission(sparkSubmitArgs, app)); err != nil {
r.recordSparkApplicationEvent(app)
return fmt.Errorf("failed to run spark-submit: %v", err)
}
return nil
}

View File

@ -122,6 +122,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 10 * time.Second},
)
_, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -139,6 +140,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -205,6 +207,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, DriverPodCreationGracePeriod: 0 * time.Second},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -266,6 +269,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -325,6 +329,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -379,6 +384,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -438,6 +444,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
nil,
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -520,6 +527,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
record.NewFakeRecorder(3),
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 10},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})
@ -539,6 +547,7 @@ var _ = Describe("SparkApplication Controller", func() {
k8sClient,
record.NewFakeRecorder(3),
nil,
&sparkapplication.SparkSubmitter{},
sparkapplication.Options{Namespaces: []string{appNamespace}, MaxTrackedExecutorPerApp: 1},
)
result, err := reconciler.Reconcile(ctx, reconcile.Request{NamespacedName: key})

View File

@ -17,6 +17,7 @@ limitations under the License.
package sparkapplication
import (
"context"
"fmt"
"os"
"os/exec"
@ -26,30 +27,46 @@ import (
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
"sigs.k8s.io/controller-runtime/pkg/log"
)
// submission includes information of a Spark application to be submitted.
type submission struct {
namespace string
name string
args []string
// SparkApplicationSubmitter is the interface for submitting a SparkApplication.
type SparkApplicationSubmitter interface {
Submit(ctx context.Context, app *v1beta2.SparkApplication) error
}
func newSubmission(args []string, app *v1beta2.SparkApplication) *submission {
return &submission{
namespace: app.Namespace,
name: app.Name,
args: args,
// SparkSubmitter submits a SparkApplication by calling spark-submit.
type SparkSubmitter struct {
}
// SparkSubmitter implements SparkApplicationSubmitter interface.
var _ SparkApplicationSubmitter = &SparkSubmitter{}
// Submit implements SparkApplicationSubmitter interface.
func (*SparkSubmitter) Submit(ctx context.Context, app *v1beta2.SparkApplication) error {
logger := log.FromContext(ctx)
args, err := buildSparkSubmitArgs(app)
if err != nil {
return fmt.Errorf("failed to build spark-submit arguments: %v", err)
}
// Try submitting the application by running spark-submit.
logger.Info("Running spark-submit", "arguments", args)
if err := runSparkSubmit(args); err != nil {
return fmt.Errorf("failed to run spark-submit: %v", err)
}
return nil
}
func runSparkSubmit(submission *submission) error {
func runSparkSubmit(args []string) error {
sparkHome, present := os.LookupEnv(common.EnvSparkHome)
if !present {
return fmt.Errorf("env %s is not specified", common.EnvSparkHome)
}
command := filepath.Join(sparkHome, "bin", "spark-submit")
cmd := exec.Command(command, submission.args...)
cmd := exec.Command(command, args...)
_, err := cmd.Output()
if err != nil {
var errorMsg string