spark-operator/internal/metrics/sparkapplication_metrics.go

387 lines
16 KiB
Go

/*
Copyright 2024 The Kubeflow authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package metrics
import (
"fmt"
"time"
"github.com/prometheus/client_golang/prometheus"
"sigs.k8s.io/controller-runtime/pkg/metrics"
"github.com/kubeflow/spark-operator/api/v1beta2"
"github.com/kubeflow/spark-operator/pkg/common"
"github.com/kubeflow/spark-operator/pkg/util"
)
type SparkApplicationMetrics struct {
prefix string
labels []string
jobStartLatencyBuckets []float64
count *prometheus.CounterVec
submitCount *prometheus.CounterVec
failedSubmissionCount *prometheus.CounterVec
runningCount *prometheus.GaugeVec
successCount *prometheus.CounterVec
failureCount *prometheus.CounterVec
successExecutionTimeSeconds *prometheus.SummaryVec
failureExecutionTimeSeconds *prometheus.SummaryVec
startLatencySeconds *prometheus.SummaryVec
startLatencySecondsHistogram *prometheus.HistogramVec
}
func NewSparkApplicationMetrics(prefix string, labels []string, jobStartLatencyBuckets []float64) *SparkApplicationMetrics {
validLabels := make([]string, 0, len(labels))
for _, label := range labels {
validLabel := util.CreateValidMetricNameLabel("", label)
validLabels = append(validLabels, validLabel)
}
return &SparkApplicationMetrics{
prefix: prefix,
labels: validLabels,
jobStartLatencyBuckets: jobStartLatencyBuckets,
count: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationCount),
Help: "Total number of SparkApplication",
},
validLabels,
),
submitCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSubmitCount),
Help: "Total number of submitted SparkApplication",
},
validLabels,
),
failedSubmissionCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailedSubmissionCount),
Help: "Total number of failed SparkApplication submission",
},
validLabels,
),
runningCount: prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationRunningCount),
Help: "Total number of running SparkApplication",
},
validLabels,
),
successCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSuccessCount),
Help: "Total number of successful SparkApplication",
},
validLabels,
),
failureCount: prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailureCount),
Help: "Total number of failed SparkApplication",
},
validLabels,
),
successExecutionTimeSeconds: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationSuccessExecutionTimeSeconds),
},
validLabels,
),
failureExecutionTimeSeconds: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationFailureExecutionTimeSeconds),
},
validLabels,
),
startLatencySeconds: prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationStartLatencySeconds),
Help: "Spark App Start Latency via the Operator",
},
validLabels,
),
startLatencySecondsHistogram: prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: util.CreateValidMetricNameLabel(prefix, common.MetricSparkApplicationStartLatencySecondsHistogram),
Help: "Spark App Start Latency counts in buckets via the Operator",
Buckets: jobStartLatencyBuckets,
},
validLabels,
),
}
}
func (m *SparkApplicationMetrics) Register() {
if err := metrics.Registry.Register(m.count); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationCount)
}
if err := metrics.Registry.Register(m.submitCount); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSubmitCount)
}
if err := metrics.Registry.Register(m.failedSubmissionCount); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailedSubmissionCount)
}
if err := metrics.Registry.Register(m.runningCount); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationRunningCount)
}
if err := metrics.Registry.Register(m.successCount); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSuccessCount)
}
if err := metrics.Registry.Register(m.failureCount); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailureCount)
}
if err := metrics.Registry.Register(m.successExecutionTimeSeconds); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationSuccessExecutionTimeSeconds)
}
if err := metrics.Registry.Register(m.failureExecutionTimeSeconds); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationFailureExecutionTimeSeconds)
}
if err := metrics.Registry.Register(m.startLatencySeconds); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationStartLatencySeconds)
}
if err := metrics.Registry.Register(m.startLatencySecondsHistogram); err != nil {
logger.Error(err, "Failed to register spark application metric", "name", common.MetricSparkApplicationStartLatencySecondsHistogram)
}
}
func (m *SparkApplicationMetrics) HandleSparkApplicationCreate(app *v1beta2.SparkApplication) {
state := util.GetApplicationState(app)
switch state {
case v1beta2.ApplicationStateNew:
m.incCount(app)
case v1beta2.ApplicationStateSubmitted:
m.incSubmitCount(app)
case v1beta2.ApplicationStateFailedSubmission:
m.incFailedSubmissionCount(app)
case v1beta2.ApplicationStateRunning:
m.incRunningCount(app)
case v1beta2.ApplicationStateFailed:
m.incFailureCount(app)
case v1beta2.ApplicationStateCompleted:
m.incSuccessCount(app)
}
}
func (m *SparkApplicationMetrics) HandleSparkApplicationUpdate(oldApp *v1beta2.SparkApplication, newApp *v1beta2.SparkApplication) {
oldState := util.GetApplicationState(oldApp)
newState := util.GetApplicationState(newApp)
if newState == oldState {
return
}
switch oldState {
case v1beta2.ApplicationStateRunning:
m.decRunningCount(oldApp)
}
switch newState {
case v1beta2.ApplicationStateNew:
m.incCount(newApp)
case v1beta2.ApplicationStateSubmitted:
m.incSubmitCount(newApp)
case v1beta2.ApplicationStateFailedSubmission:
m.incFailedSubmissionCount(newApp)
case v1beta2.ApplicationStateRunning:
m.incRunningCount(newApp)
m.observeStartLatencySeconds(newApp)
case v1beta2.ApplicationStateCompleted:
m.incSuccessCount(newApp)
m.observeSuccessExecutionTimeSeconds(newApp)
case v1beta2.ApplicationStateFailed:
m.incFailureCount(newApp)
m.observeFailureExecutionTimeSeconds(newApp)
}
}
func (m *SparkApplicationMetrics) HandleSparkApplicationDelete(app *v1beta2.SparkApplication) {
state := util.GetApplicationState(app)
switch state {
case v1beta2.ApplicationStateRunning:
m.decRunningCount(app)
}
}
func (m *SparkApplicationMetrics) incCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
counter, err := m.count.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationCount, "labels", labels)
return
}
counter.Inc()
logger.V(1).Info("Increased spark application count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationCount, "labels", labels)
}
func (m *SparkApplicationMetrics) incSubmitCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
counter, err := m.submitCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSubmitCount, "labels", labels)
return
}
counter.Inc()
logger.V(1).Info("Increased spark application submit count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSubmitCount, "labels", labels)
}
func (m *SparkApplicationMetrics) incFailedSubmissionCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
counter, err := m.failedSubmissionCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailedSubmissionCount, "labels", labels)
return
}
counter.Inc()
logger.V(1).Info("Increased spark application failed submission count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailedSubmissionCount, "labels", labels)
}
func (m *SparkApplicationMetrics) incRunningCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
gauge, err := m.runningCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationRunningCount, "labels", labels)
return
}
gauge.Inc()
logger.V(1).Info("Increased spark application running count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationRunningCount, "labels", labels)
}
func (m *SparkApplicationMetrics) decRunningCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
gauge, err := m.runningCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationRunningCount, "labels", labels)
return
}
gauge.Dec()
logger.V(1).Info("Decreased SparkApplication running count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationRunningCount, "labels", labels)
}
func (m *SparkApplicationMetrics) incSuccessCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
counter, err := m.successCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSuccessCount, "labels", labels)
return
}
counter.Inc()
logger.V(1).Info("Increased spark application success count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSuccessCount, "labels", labels)
}
func (m *SparkApplicationMetrics) incFailureCount(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
counter, err := m.failureCount.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailureCount, "labels", labels)
return
}
counter.Inc()
logger.V(1).Info("Increased spark application failure count", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailureCount, "labels", labels)
}
func (m *SparkApplicationMetrics) observeSuccessExecutionTimeSeconds(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
observer, err := m.successExecutionTimeSeconds.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSuccessExecutionTimeSeconds, "labels", labels)
}
if app.Status.LastSubmissionAttemptTime.IsZero() || app.Status.TerminationTime.IsZero() {
err := fmt.Errorf("last submission attempt time or termination time is zero")
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSuccessExecutionTimeSeconds, "labels", labels)
return
}
duration := app.Status.TerminationTime.Sub(app.Status.LastSubmissionAttemptTime.Time)
observer.Observe(duration.Seconds())
logger.V(1).Info("Observed spark application success execution time seconds", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationSuccessExecutionTimeSeconds, "labels", labels, "value", duration.Seconds())
}
func (m *SparkApplicationMetrics) observeFailureExecutionTimeSeconds(app *v1beta2.SparkApplication) {
labels := m.getMetricLabels(app)
observer, err := m.failureExecutionTimeSeconds.GetMetricWith(labels)
if err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailureExecutionTimeSeconds, "labels", labels)
}
if app.Status.LastSubmissionAttemptTime.IsZero() || app.Status.TerminationTime.IsZero() {
err := fmt.Errorf("last submission attempt time or termination time is zero")
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailureExecutionTimeSeconds, "labels", labels)
return
}
duration := app.Status.TerminationTime.Sub(app.Status.LastSubmissionAttemptTime.Time)
observer.Observe(duration.Seconds())
logger.V(1).Info("Observed spark application failure execution time seconds", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationFailureExecutionTimeSeconds, "labels", labels, "value", duration.Seconds())
}
func (m *SparkApplicationMetrics) observeStartLatencySeconds(app *v1beta2.SparkApplication) {
// Only export the spark application start latency seconds metric for the first time
if app.Status.ExecutionAttempts != 1 {
return
}
labels := m.getMetricLabels(app)
latency := time.Since(app.CreationTimestamp.Time)
if observer, err := m.startLatencySeconds.GetMetricWith(labels); err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationStartLatencySeconds, "labels", labels)
} else {
observer.Observe(latency.Seconds())
logger.V(1).Info("Observed spark application start latency seconds", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationStartLatencySeconds, "labels", labels, "value", latency.Seconds())
}
if histogram, err := m.startLatencySecondsHistogram.GetMetricWith(labels); err != nil {
logger.Error(err, "Failed to collect metric for SparkApplication", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationStartLatencySecondsHistogram, "labels", labels)
} else {
histogram.Observe(latency.Seconds())
logger.V(1).Info("Observed spark application start latency seconds", "name", app.Name, "namespace", app.Namespace, "metric", common.MetricSparkApplicationStartLatencySecondsHistogram, "labels", labels, "value", latency.Seconds())
}
}
func (m *SparkApplicationMetrics) getMetricLabels(app *v1beta2.SparkApplication) map[string]string {
// Convert spark application validLabels to valid metric validLabels.
validLabels := make(map[string]string)
for key, val := range app.Labels {
newKey := util.CreateValidMetricNameLabel(m.prefix, key)
validLabels[newKey] = val
}
metricLabels := make(map[string]string)
for _, label := range m.labels {
if _, ok := validLabels[label]; ok {
metricLabels[label] = validLabels[label]
} else if label == "namespace" {
metricLabels[label] = app.Namespace
} else {
metricLabels[label] = "Unknown"
}
}
return metricLabels
}