karmada/pkg/resourceinterpreter/default/native/aggregatestatus.go

620 lines
25 KiB
Go

/*
Copyright 2022 The Karmada Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package native
import (
"encoding/json"
"reflect"
appsv1 "k8s.io/api/apps/v1"
autoscalingv2 "k8s.io/api/autoscaling/v2"
batchv1 "k8s.io/api/batch/v1"
corev1 "k8s.io/api/core/v1"
networkingv1 "k8s.io/api/networking/v1"
policyv1 "k8s.io/api/policy/v1"
"k8s.io/apimachinery/pkg/api/equality"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/klog/v2"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
)
type aggregateStatusInterpreter func(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error)
func getAllDefaultAggregateStatusInterpreter() map[schema.GroupVersionKind]aggregateStatusInterpreter {
s := make(map[schema.GroupVersionKind]aggregateStatusInterpreter)
s[appsv1.SchemeGroupVersion.WithKind(util.DeploymentKind)] = aggregateDeploymentStatus
s[corev1.SchemeGroupVersion.WithKind(util.ServiceKind)] = aggregateServiceStatus
s[networkingv1.SchemeGroupVersion.WithKind(util.IngressKind)] = aggregateIngressStatus
s[batchv1.SchemeGroupVersion.WithKind(util.JobKind)] = aggregateJobStatus
s[batchv1.SchemeGroupVersion.WithKind(util.CronJobKind)] = aggregateCronJobStatus
s[appsv1.SchemeGroupVersion.WithKind(util.DaemonSetKind)] = aggregateDaemonSetStatus
s[appsv1.SchemeGroupVersion.WithKind(util.StatefulSetKind)] = aggregateStatefulSetStatus
s[corev1.SchemeGroupVersion.WithKind(util.PodKind)] = aggregatePodStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeKind)] = aggregatePersistentVolumeStatus
s[corev1.SchemeGroupVersion.WithKind(util.PersistentVolumeClaimKind)] = aggregatePersistentVolumeClaimStatus
s[policyv1.SchemeGroupVersion.WithKind(util.PodDisruptionBudgetKind)] = aggregatePodDisruptionBudgetStatus
s[autoscalingv2.SchemeGroupVersion.WithKind(util.HorizontalPodAutoscalerKind)] = aggregateHorizontalPodAutoscalerStatus
return s
}
func aggregateDeploymentStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
deploy := &appsv1.Deployment{}
err := helper.ConvertToTypedObject(object, deploy)
if err != nil {
return nil, err
}
oldStatus := &deploy.Status
newStatus := &appsv1.DeploymentStatus{}
observedLatestResourceTemplateGenerationCount := 0
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
member := &WrappedDeploymentStatus{}
if err = json.Unmarshal(item.Status.Raw, member); err != nil {
return nil, err
}
klog.V(3).Infof("Grab deployment(%s/%s) status from cluster(%s), replicas: %d, ready: %d, updated: %d, available: %d, unavailable: %d",
deploy.Namespace, deploy.Name, item.ClusterName, member.Replicas, member.ReadyReplicas, member.UpdatedReplicas, member.AvailableReplicas, member.UnavailableReplicas)
// `memberStatus.ObservedGeneration >= memberStatus.Generation` means the member's status corresponds the latest spec revision of the member deployment.
// `memberStatus.ResourceTemplateGeneration >= deploy.Generation` means the member deployment has been aligned with the latest spec revision of federated deployment.
// If both conditions are met, we consider the member's status corresponds the latest spec revision of federated deployment.
if member.ObservedGeneration >= member.Generation &&
member.ResourceTemplateGeneration >= deploy.Generation {
observedLatestResourceTemplateGenerationCount++
}
newStatus.Replicas += member.Replicas
newStatus.ReadyReplicas += member.ReadyReplicas
newStatus.UpdatedReplicas += member.UpdatedReplicas
newStatus.AvailableReplicas += member.AvailableReplicas
newStatus.UnavailableReplicas += member.UnavailableReplicas
}
// The 'observedGeneration' is mainly used by GitOps tools(like 'Argo CD') to assess the health status.
// For more details, please refer to https://argo-cd.readthedocs.io/en/stable/operator-manual/health/.
newStatus.ObservedGeneration = oldStatus.ObservedGeneration
if observedLatestResourceTemplateGenerationCount == len(aggregatedStatusItems) {
newStatus.ObservedGeneration = deploy.Generation
}
if oldStatus.ObservedGeneration == newStatus.ObservedGeneration &&
oldStatus.Replicas == newStatus.Replicas &&
oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas &&
oldStatus.AvailableReplicas == newStatus.AvailableReplicas &&
oldStatus.UnavailableReplicas == newStatus.UnavailableReplicas {
klog.V(3).Infof("Ignore update deployment(%s/%s) status as up to date", deploy.Namespace, deploy.Name)
return object, nil
}
oldStatus.ObservedGeneration = newStatus.ObservedGeneration
oldStatus.Replicas = newStatus.Replicas
oldStatus.ReadyReplicas = newStatus.ReadyReplicas
oldStatus.UpdatedReplicas = newStatus.UpdatedReplicas
oldStatus.AvailableReplicas = newStatus.AvailableReplicas
oldStatus.UnavailableReplicas = newStatus.UnavailableReplicas
return helper.ToUnstructured(deploy)
}
func aggregateServiceStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
service := &corev1.Service{}
err := helper.ConvertToTypedObject(object, service)
if err != nil {
return nil, err
}
if service.Spec.Type != corev1.ServiceTypeLoadBalancer {
return object, nil
}
// If service type is of type LoadBalancer, collect the status.loadBalancer.ingress
newStatus := &corev1.ServiceStatus{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &corev1.ServiceStatus{}
if err := json.Unmarshal(item.Status.Raw, temp); err != nil {
klog.Errorf("Failed to unmarshal status of service(%s/%s): %v", service.Namespace, service.Name, err)
return nil, err
}
klog.V(3).Infof("Grab service(%s/%s) status from cluster(%s), loadBalancer status: %v",
service.Namespace, service.Name, item.ClusterName, temp.LoadBalancer)
newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...)
}
if reflect.DeepEqual(service.Status, *newStatus) {
klog.V(3).Infof("Ignore update service(%s/%s) status as up to date", service.Namespace, service.Name)
return object, nil
}
service.Status = *newStatus
return helper.ToUnstructured(service)
}
func aggregateIngressStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
ingress := &networkingv1.Ingress{}
err := helper.ConvertToTypedObject(object, ingress)
if err != nil {
return nil, err
}
newStatus := &networkingv1.IngressStatus{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &networkingv1.IngressStatus{}
if err := json.Unmarshal(item.Status.Raw, temp); err != nil {
klog.Errorf("Failed to unmarshal status ingress(%s/%s): %v", ingress.Namespace, ingress.Name, err)
return nil, err
}
klog.V(3).Infof("Grab ingress(%s/%s) status from cluster(%s), loadBalancer status: %v",
ingress.Namespace, ingress.Name, item.ClusterName, temp.LoadBalancer)
newStatus.LoadBalancer.Ingress = append(newStatus.LoadBalancer.Ingress, temp.LoadBalancer.Ingress...)
}
if reflect.DeepEqual(ingress.Status, *newStatus) {
klog.V(3).Infof("Ignore update ingress(%s/%s) status as up to date", ingress.Namespace, ingress.Name)
return object, nil
}
ingress.Status = *newStatus
return helper.ToUnstructured(ingress)
}
func aggregateJobStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
job := &batchv1.Job{}
err := helper.ConvertToTypedObject(object, job)
if err != nil {
return nil, err
}
// If a job is finished, we should never update status again.
if finished, _ := helper.GetJobFinishedStatus(&job.Status); finished {
return object, nil
}
newStatus, err := helper.ParsingJobStatus(job, aggregatedStatusItems)
if err != nil {
return nil, err
}
if reflect.DeepEqual(job.Status, *newStatus) {
klog.V(3).Infof("Ignore update job(%s/%s) status as up to date", job.Namespace, job.Name)
return object, nil
}
job.Status = *newStatus
return helper.ToUnstructured(job)
}
func aggregateCronJobStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
cronjob := &batchv1.CronJob{}
err := helper.ConvertToTypedObject(object, cronjob)
if err != nil {
return nil, err
}
newStatus := &batchv1.CronJobStatus{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &batchv1.CronJobStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab cronJob(%s/%s) status from cluster(%s), active: %+v, lastScheduleTime: %+v, lastSuccessfulTime: %+v",
cronjob.Namespace, cronjob.Name, item.ClusterName, temp.Active, temp.LastScheduleTime, temp.LastSuccessfulTime)
newStatus.Active = append(newStatus.Active, temp.Active...)
if newStatus.LastScheduleTime == nil {
newStatus.LastScheduleTime = temp.LastScheduleTime
}
if newStatus.LastScheduleTime != nil && temp.LastScheduleTime != nil && newStatus.LastScheduleTime.Before(temp.LastScheduleTime) {
newStatus.LastScheduleTime = temp.LastScheduleTime
}
if newStatus.LastSuccessfulTime == nil {
newStatus.LastSuccessfulTime = temp.LastSuccessfulTime
}
if newStatus.LastSuccessfulTime != nil && temp.LastSuccessfulTime != nil && newStatus.LastSuccessfulTime.Before(temp.LastSuccessfulTime) {
newStatus.LastSuccessfulTime = temp.LastSuccessfulTime
}
}
if reflect.DeepEqual(cronjob.Status, *newStatus) {
klog.V(3).Infof("Ignore update cronjob(%s/%s) status as up to date", cronjob.Namespace, cronjob.Name)
return object, nil
}
cronjob.Status.Active = newStatus.Active
cronjob.Status.LastScheduleTime = newStatus.LastScheduleTime
cronjob.Status.LastSuccessfulTime = newStatus.LastSuccessfulTime
return helper.ToUnstructured(cronjob)
}
func aggregateDaemonSetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
daemonSet := &appsv1.DaemonSet{}
err := helper.ConvertToTypedObject(object, daemonSet)
if err != nil {
return nil, err
}
oldStatus := &daemonSet.Status
newStatus := &appsv1.DaemonSetStatus{}
observedLatestResourceTemplateGenerationCount := 0
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
member := &WrappedDaemonSetStatus{}
if err = json.Unmarshal(item.Status.Raw, member); err != nil {
return nil, err
}
klog.V(3).Infof("Grab daemonSet(%s/%s) status from cluster(%s), currentNumberScheduled: %d, desiredNumberScheduled: %d, numberAvailable: %d, numberMisscheduled: %d, numberReady: %d, updatedNumberScheduled: %d, numberUnavailable: %d",
daemonSet.Namespace, daemonSet.Name, item.ClusterName, member.CurrentNumberScheduled, member.DesiredNumberScheduled, member.NumberAvailable, member.NumberMisscheduled, member.NumberReady, member.UpdatedNumberScheduled, member.NumberUnavailable)
// `memberStatus.ObservedGeneration >= memberStatus.Generation` means the member's status corresponds the latest spec revision of the member DaemonSet.
// `memberStatus.ResourceTemplateGeneration >= daemonSet.Generation` means the member DaemonSet has been aligned with the latest spec revision of federated DaemonSet.
// If both conditions are met, we consider the member's status corresponds the latest spec revision of federated DaemonSet.
if member.ObservedGeneration >= member.Generation &&
member.ResourceTemplateGeneration >= daemonSet.Generation {
observedLatestResourceTemplateGenerationCount++
}
newStatus.CurrentNumberScheduled += member.CurrentNumberScheduled
newStatus.DesiredNumberScheduled += member.DesiredNumberScheduled
newStatus.NumberAvailable += member.NumberAvailable
newStatus.NumberMisscheduled += member.NumberMisscheduled
newStatus.NumberReady += member.NumberReady
newStatus.UpdatedNumberScheduled += member.UpdatedNumberScheduled
newStatus.NumberUnavailable += member.NumberUnavailable
}
// The 'observedGeneration' is mainly used by GitOps tools(like 'Argo CD') to assess the health status.
// For more details, please refer to https://argo-cd.readthedocs.io/en/stable/operator-manual/health/.
newStatus.ObservedGeneration = oldStatus.ObservedGeneration
if observedLatestResourceTemplateGenerationCount == len(aggregatedStatusItems) {
newStatus.ObservedGeneration = daemonSet.Generation
}
if equality.Semantic.DeepEqual(oldStatus, newStatus) {
klog.V(3).Infof("Ignore update daemonSet(%s/%s) status as up to date", daemonSet.Namespace, daemonSet.Name)
return object, nil
}
oldStatus.ObservedGeneration = newStatus.ObservedGeneration
oldStatus.CurrentNumberScheduled = newStatus.CurrentNumberScheduled
oldStatus.DesiredNumberScheduled = newStatus.DesiredNumberScheduled
oldStatus.NumberAvailable = newStatus.NumberAvailable
oldStatus.NumberMisscheduled = newStatus.NumberMisscheduled
oldStatus.NumberReady = newStatus.NumberReady
oldStatus.UpdatedNumberScheduled = newStatus.UpdatedNumberScheduled
oldStatus.NumberUnavailable = newStatus.NumberUnavailable
return helper.ToUnstructured(daemonSet)
}
func aggregateStatefulSetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
statefulSet := &appsv1.StatefulSet{}
err := helper.ConvertToTypedObject(object, statefulSet)
if err != nil {
return nil, err
}
oldStatus := &statefulSet.Status
newStatus := &appsv1.StatefulSetStatus{}
observedLatestResourceTemplateGenerationCount := 0
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
member := &WrappedStatefulSetStatus{}
if err = json.Unmarshal(item.Status.Raw, member); err != nil {
return nil, err
}
klog.V(3).Infof("Grab statefulSet(%s/%s) status from cluster(%s), availableReplicas: %d, currentReplicas: %d, readyReplicas: %d, replicas: %d, updatedReplicas: %d",
statefulSet.Namespace, statefulSet.Name, item.ClusterName, member.AvailableReplicas, member.CurrentReplicas, member.ReadyReplicas, member.Replicas, member.UpdatedReplicas)
// `memberStatus.ObservedGeneration >= memberStatus.Generation` means the member's status corresponds the latest spec revision of the member statefulset.
// `memberStatus.ResourceTemplateGeneration >= deploy.Generation` means the member statefulset has been aligned with the latest spec revision of federated statefulset.
// If both conditions are met, we consider the member's status corresponds the latest spec revision of federated statefulset.
if member.ObservedGeneration >= member.Generation &&
member.ResourceTemplateGeneration >= member.Generation {
observedLatestResourceTemplateGenerationCount++
}
newStatus.AvailableReplicas += member.AvailableReplicas
newStatus.CurrentReplicas += member.CurrentReplicas
newStatus.ReadyReplicas += member.ReadyReplicas
newStatus.Replicas += member.Replicas
newStatus.UpdatedReplicas += member.UpdatedReplicas
}
// The 'observedGeneration' is mainly used by GitOps tools(like 'Argo CD') to assess the health status.
// For more details, please refer to https://argo-cd.readthedocs.io/en/stable/operator-manual/health/.
newStatus.ObservedGeneration = oldStatus.ObservedGeneration
if observedLatestResourceTemplateGenerationCount == len(aggregatedStatusItems) {
newStatus.ObservedGeneration = statefulSet.Generation
}
if oldStatus.ObservedGeneration == newStatus.ObservedGeneration &&
oldStatus.AvailableReplicas == newStatus.AvailableReplicas &&
oldStatus.CurrentReplicas == newStatus.CurrentReplicas &&
oldStatus.ReadyReplicas == newStatus.ReadyReplicas &&
oldStatus.Replicas == newStatus.Replicas &&
oldStatus.UpdatedReplicas == newStatus.UpdatedReplicas {
klog.V(3).Infof("Ignore update statefulSet(%s/%s) status as up to date", statefulSet.Namespace, statefulSet.Name)
return object, nil
}
oldStatus.ObservedGeneration = newStatus.ObservedGeneration
oldStatus.AvailableReplicas = newStatus.AvailableReplicas
oldStatus.CurrentReplicas = newStatus.CurrentReplicas
oldStatus.ReadyReplicas = newStatus.ReadyReplicas
oldStatus.Replicas = newStatus.Replicas
oldStatus.UpdatedReplicas = newStatus.UpdatedReplicas
return helper.ToUnstructured(statefulSet)
}
func aggregatePodStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pod := &corev1.Pod{}
err := helper.ConvertToTypedObject(object, pod)
if err != nil {
return nil, err
}
if aggregatedStatusItems == nil {
return helper.ToUnstructured(pod)
}
newStatus := &corev1.PodStatus{}
newStatus.ContainerStatuses = make([]corev1.ContainerStatus, 0)
podPhases := sets.NewString()
for _, item := range aggregatedStatusItems {
if item.Status == nil {
// maybe pod's status hasn't been collected yet, assume it's in pending state.
// Otherwise, it may affect the final aggregated state.
podPhases.Insert(string(corev1.PodPending))
continue
}
temp := &corev1.PodStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
podPhases.Insert(string(temp.Phase))
for _, containerStatus := range temp.ContainerStatuses {
tempStatus := corev1.ContainerStatus{
Ready: containerStatus.Ready,
State: containerStatus.State,
}
newStatus.ContainerStatuses = append(newStatus.ContainerStatuses, tempStatus)
}
for _, initContainerStatus := range temp.InitContainerStatuses {
tempStatus := corev1.ContainerStatus{
Ready: initContainerStatus.Ready,
State: initContainerStatus.State,
}
newStatus.InitContainerStatuses = append(newStatus.InitContainerStatuses, tempStatus)
}
klog.V(3).Infof("Grab pod(%s/%s) status from cluster(%s), phase: %s", pod.Namespace,
pod.Name, item.ClusterName, temp.Phase)
}
// Check final phase in order: PodFailed-->PodPending-->PodRunning-->PodSucceeded
// Ignore to check PodUnknown as it has been deprecated since year 2015.
// More details please refer to: https://github.com/karmada-io/karmada/issues/2137
switch {
case podPhases.Has(string(corev1.PodFailed)):
newStatus.Phase = corev1.PodFailed
case podPhases.Has(string(corev1.PodPending)):
newStatus.Phase = corev1.PodPending
case podPhases.Has(string(corev1.PodRunning)):
newStatus.Phase = corev1.PodRunning
case podPhases.Has(string(corev1.PodSucceeded)):
newStatus.Phase = corev1.PodSucceeded
default:
klog.Errorf("SHOULD-NEVER-HAPPEN, maybe Pod added a new state that Karmada don't know about.")
}
if reflect.DeepEqual(pod.Status, *newStatus) {
klog.V(3).Infof("Ignore update pod(%s/%s) status as up to date", pod.Namespace, pod.Name)
return object, nil
}
pod.Status = *newStatus
return helper.ToUnstructured(pod)
}
func aggregatePersistentVolumeStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pv := &corev1.PersistentVolume{}
err := helper.ConvertToTypedObject(object, pv)
if err != nil {
return nil, err
}
newStatus := &corev1.PersistentVolumeStatus{}
volumePhases := sets.NewString()
for _, item := range aggregatedStatusItems {
if item.Status == nil {
// maybe volume's status hasn't been collected yet, assume it's in pending state.
// Otherwise, it may affect the final aggregated state.
volumePhases.Insert(string(corev1.VolumePending))
continue
}
temp := &corev1.PersistentVolumeStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab persistentVolume(%s/%s) status from cluster(%s), phase: %s",
pv.Namespace, pv.Name, item.ClusterName, temp.Phase)
volumePhases.Insert(string(temp.Phase))
}
// Check final phase in order: VolumeFailed-->VolumePending-->VolumeAvailable-->VolumeBound-->VolumeReleased
// More details please refer to: https://github.com/karmada-io/karmada/issues/2394
switch {
case volumePhases.Has(string(corev1.VolumeFailed)):
newStatus.Phase = corev1.VolumeFailed
case volumePhases.Has(string(corev1.VolumePending)):
newStatus.Phase = corev1.VolumePending
case volumePhases.Has(string(corev1.VolumeAvailable)):
newStatus.Phase = corev1.VolumeAvailable
case volumePhases.Has(string(corev1.VolumeBound)):
newStatus.Phase = corev1.VolumeBound
case volumePhases.Has(string(corev1.VolumeReleased)):
newStatus.Phase = corev1.VolumeReleased
default:
klog.Errorf("SHOULD-NEVER-HAPPEN, maybe volume added a new state that Karmada don't know about.")
}
if reflect.DeepEqual(pv.Status, *newStatus) {
klog.V(3).Infof("Ignore update persistentVolume(%s/%s) status as up to date", pv.Namespace, pv.Name)
return object, nil
}
pv.Status = *newStatus
return helper.ToUnstructured(pv)
}
func aggregatePersistentVolumeClaimStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pvc := &corev1.PersistentVolumeClaim{}
err := helper.ConvertToTypedObject(object, pvc)
if err != nil {
return nil, err
}
newStatus := &corev1.PersistentVolumeClaimStatus{Phase: corev1.ClaimBound}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &corev1.PersistentVolumeClaimStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab pvc(%s/%s) status from cluster(%s), phase: %s", pvc.Namespace,
pvc.Name, item.ClusterName, temp.Phase)
if temp.Phase == corev1.ClaimLost {
newStatus.Phase = corev1.ClaimLost
break
}
if temp.Phase != corev1.ClaimBound {
newStatus.Phase = temp.Phase
}
}
if reflect.DeepEqual(pvc.Status, *newStatus) {
klog.V(3).Infof("Ignore update pvc(%s/%s) status as up to date", pvc.Namespace, pvc.Name)
return object, nil
}
pvc.Status = *newStatus
return helper.ToUnstructured(pvc)
}
func aggregatePodDisruptionBudgetStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
pdb := &policyv1.PodDisruptionBudget{}
err := helper.ConvertToTypedObject(object, pdb)
if err != nil {
return nil, err
}
newStatus := &policyv1.PodDisruptionBudgetStatus{
DisruptedPods: make(map[string]metav1.Time),
}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &policyv1.PodDisruptionBudgetStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof(
"Grab pdb(%s/%s) status from cluster(%s), desired healthy: %d, current healthy: %d, disrupted allowed: %d, expected: %d",
pdb.Namespace, pdb.Name, item.ClusterName,
temp.DesiredHealthy, temp.CurrentHealthy, temp.DisruptionsAllowed, temp.ExpectedPods,
)
newStatus.CurrentHealthy += temp.CurrentHealthy
newStatus.DesiredHealthy += temp.DesiredHealthy
newStatus.ExpectedPods += temp.ExpectedPods
newStatus.DisruptionsAllowed += temp.DisruptionsAllowed
for podName, evictionTime := range temp.DisruptedPods {
newStatus.DisruptedPods[item.ClusterName+"/"+podName] = evictionTime
}
}
if reflect.DeepEqual(pdb.Status, *newStatus) {
klog.V(3).Infof("ignore update pdb(%s/%s) status as up to date", pdb.Namespace, pdb.Name)
return object, nil
}
pdb.Status = *newStatus
return helper.ToUnstructured(pdb)
}
func aggregateHorizontalPodAutoscalerStatus(object *unstructured.Unstructured, aggregatedStatusItems []workv1alpha2.AggregatedStatusItem) (*unstructured.Unstructured, error) {
hpa := &autoscalingv2.HorizontalPodAutoscaler{}
err := helper.ConvertToTypedObject(object, hpa)
if err != nil {
return nil, err
}
newStatus := &autoscalingv2.HorizontalPodAutoscalerStatus{}
for _, item := range aggregatedStatusItems {
if item.Status == nil {
continue
}
temp := &autoscalingv2.HorizontalPodAutoscalerStatus{}
if err = json.Unmarshal(item.Status.Raw, temp); err != nil {
return nil, err
}
klog.V(3).Infof("Grab hpa(%s/%s) status from cluster(%s), CurrentReplicas: %d, DesiredReplicas: %d",
hpa.Namespace, hpa.Name, item.ClusterName, temp.CurrentReplicas, temp.DesiredReplicas)
newStatus.CurrentReplicas += temp.CurrentReplicas
newStatus.DesiredReplicas += temp.DesiredReplicas
}
if reflect.DeepEqual(hpa.Status, *newStatus) {
klog.V(3).Infof("ignore update hpa(%s/%s) status as up to date", hpa.Namespace, hpa.Name)
return object, nil
}
hpa.Status = *newStatus
return helper.ToUnstructured(hpa)
}