diff --git a/pkg/detector/aggregate_status.go b/pkg/detector/aggregate_status.go index 79487372b..7b76c85ee 100644 --- a/pkg/detector/aggregate_status.go +++ b/pkg/detector/aggregate_status.go @@ -3,20 +3,18 @@ package detector import ( "context" "encoding/json" - "fmt" "reflect" - "strings" appsv1 "k8s.io/api/apps/v1" batchv1 "k8s.io/api/batch/v1" corev1 "k8s.io/api/core/v1" extensionsv1beta1 "k8s.io/api/extensions/v1beta1" apierrors "k8s.io/apimachinery/pkg/api/errors" - metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/klog/v2" "sigs.k8s.io/controller-runtime/pkg/client" workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" + "github.com/karmada-io/karmada/pkg/util/helper" ) // AggregateDeploymentStatus summarize deployment status and update to original objects. @@ -204,7 +202,7 @@ func (d *ResourceDetector) AggregateJobStatus(objRef workv1alpha2.ObjectReferenc return err } - newStatus, err := d.parsingJobStatus(obj, status, clusters) + newStatus, err := helper.ParsingJobStatus(obj, status, clusters) if err != nil { return err } @@ -222,67 +220,3 @@ func (d *ResourceDetector) AggregateJobStatus(objRef workv1alpha2.ObjectReferenc return nil } - -// getJobFinishedStatus checks whether the given Job has finished execution. -// It does not discriminate between successful and failed terminations. -func (d *ResourceDetector) getJobFinishedStatus(jobStatus *batchv1.JobStatus) (bool, batchv1.JobConditionType) { - for _, c := range jobStatus.Conditions { - if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { - return true, c.Type - } - } - return false, "" -} - -// parsingJobStatus generates new status of given 'AggregatedStatusItem'. -func (d *ResourceDetector) parsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusItem, clusters []workv1alpha2.TargetCluster) (*batchv1.JobStatus, error) { - var jobFailed []string - successfulJobs := 0 - newStatus := &batchv1.JobStatus{} - for _, item := range status { - if item.Status == nil { - continue - } - temp := &batchv1.JobStatus{} - if err := json.Unmarshal(item.Status.Raw, temp); err != nil { - klog.Errorf("Failed to unmarshal status of job(%s/%s): %v", obj.Namespace, obj.Name, err) - return nil, err - } - klog.V(3).Infof("Grab job(%s/%s) status from cluster(%s), active: %d, succeeded %d, failed: %d", - obj.Namespace, obj.Name, item.ClusterName, temp.Active, temp.Succeeded, temp.Failed) - - newStatus.Active += temp.Active - newStatus.Succeeded += temp.Succeeded - newStatus.Failed += temp.Failed - - isFinished, finishedStatus := d.getJobFinishedStatus(temp) - if isFinished && finishedStatus == batchv1.JobComplete { - successfulJobs++ - } else if isFinished && finishedStatus == batchv1.JobFailed { - jobFailed = append(jobFailed, item.ClusterName) - } - } - - if len(jobFailed) != 0 { - newStatus.Conditions = append(newStatus.Conditions, batchv1.JobCondition{ - Type: batchv1.JobFailed, - Status: corev1.ConditionTrue, - LastProbeTime: metav1.Now(), - LastTransitionTime: metav1.Now(), - Reason: "JobFailed", - Message: fmt.Sprintf("Job executed failed in member clusters %s", strings.Join(jobFailed, ",")), - }) - } - - if successfulJobs == len(clusters) { - newStatus.Conditions = append(newStatus.Conditions, batchv1.JobCondition{ - Type: batchv1.JobComplete, - Status: corev1.ConditionTrue, - LastProbeTime: metav1.Now(), - LastTransitionTime: metav1.Now(), - Reason: "Completed", - Message: "Job completed", - }) - } - return newStatus, nil -} diff --git a/pkg/util/helper/job.go b/pkg/util/helper/job.go new file mode 100644 index 000000000..7896d1332 --- /dev/null +++ b/pkg/util/helper/job.go @@ -0,0 +1,100 @@ +package helper + +import ( + "encoding/json" + "fmt" + "strings" + + batchv1 "k8s.io/api/batch/v1" + corev1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/klog/v2" + + workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2" +) + +// ParsingJobStatus generates new status of given 'AggregatedStatusItem'. +//nolint:gocyclo +func ParsingJobStatus(obj *batchv1.Job, status []workv1alpha2.AggregatedStatusItem, clusters []workv1alpha2.TargetCluster) (*batchv1.JobStatus, error) { + var jobFailed []string + var startTime, completionTime *metav1.Time + successfulJobs, completionJobs := 0, 0 + newStatus := &batchv1.JobStatus{} + for _, item := range status { + if item.Status == nil { + continue + } + temp := &batchv1.JobStatus{} + if err := json.Unmarshal(item.Status.Raw, temp); err != nil { + klog.Errorf("Failed to unmarshal status of job(%s/%s): %v", obj.Namespace, obj.Name, err) + return nil, err + } + klog.V(3).Infof("Grab job(%s/%s) status from cluster(%s), active: %d, succeeded %d, failed: %d", + obj.Namespace, obj.Name, item.ClusterName, temp.Active, temp.Succeeded, temp.Failed) + + newStatus.Active += temp.Active + newStatus.Succeeded += temp.Succeeded + newStatus.Failed += temp.Failed + + isFinished, finishedStatus := getJobFinishedStatus(temp) + if isFinished && finishedStatus == batchv1.JobComplete { + successfulJobs++ + } else if isFinished && finishedStatus == batchv1.JobFailed { + jobFailed = append(jobFailed, item.ClusterName) + } + + // StartTime + if startTime == nil || temp.StartTime.Before(startTime) { + startTime = temp.StartTime + } + // CompletionTime + if temp.CompletionTime != nil { + completionJobs++ + if completionTime == nil || completionTime.Before(temp.CompletionTime) { + completionTime = temp.CompletionTime + } + } + } + + if len(jobFailed) != 0 { + newStatus.Conditions = append(newStatus.Conditions, batchv1.JobCondition{ + Type: batchv1.JobFailed, + Status: corev1.ConditionTrue, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "JobFailed", + Message: fmt.Sprintf("Job executed failed in member clusters %s", strings.Join(jobFailed, ",")), + }) + } + + if successfulJobs == len(clusters) { + newStatus.Conditions = append(newStatus.Conditions, batchv1.JobCondition{ + Type: batchv1.JobComplete, + Status: corev1.ConditionTrue, + LastProbeTime: metav1.Now(), + LastTransitionTime: metav1.Now(), + Reason: "Completed", + Message: "Job completed", + }) + } + + if startTime != nil { + newStatus.StartTime = startTime.DeepCopy() + } + if completionTime != nil && completionJobs == len(clusters) { + newStatus.CompletionTime = completionTime.DeepCopy() + } + + return newStatus, nil +} + +// getJobFinishedStatus checks whether the given Job has finished execution. +// It does not discriminate between successful and failed terminations. +func getJobFinishedStatus(jobStatus *batchv1.JobStatus) (bool, batchv1.JobConditionType) { + for _, c := range jobStatus.Conditions { + if (c.Type == batchv1.JobComplete || c.Type == batchv1.JobFailed) && c.Status == corev1.ConditionTrue { + return true, c.Type + } + } + return false, "" +}