Aggregate status to resourcebinding (#221)

Signed-off-by: changzhen <changzhen5@huawei.com>
This commit is contained in:
Zhen Chang 2021-03-17 18:17:46 +08:00 committed by GitHub
parent 696eaa919b
commit da34779efb
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 200 additions and 25 deletions

View File

@ -2,17 +2,26 @@ package binding
import ( import (
"context" "context"
"reflect"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1" apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/api/meta" "k8s.io/apimachinery/pkg/api/meta"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/dynamic" "k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/record" "k8s.io/client-go/tools/record"
"k8s.io/klog/v2" "k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime" controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/builder"
"sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper" "github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/overridemanager" "github.com/karmada-io/karmada/pkg/util/overridemanager"
) )
@ -53,7 +62,7 @@ func (c *ResourceBindingController) Reconcile(req controllerruntime.Request) (co
isReady := helper.IsBindingReady(binding.Spec.Clusters) isReady := helper.IsBindingReady(binding.Spec.Clusters)
if !isReady { if !isReady {
klog.Infof("ResourceBinding %s/%s is not ready to sync", binding.GetNamespace(), binding.GetName()) klog.Infof("ResourceBinding(%s/%s) is not ready to sync", binding.GetNamespace(), binding.GetName())
return controllerruntime.Result{}, nil return controllerruntime.Result{}, nil
} }
@ -65,28 +74,35 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha1.ResourceBi
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters) clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters)
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped) works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped)
if err != nil { if err != nil {
klog.Errorf("Failed to find orphan works by resourceBinding %s/%s. Error: %v.", klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err) binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err return controllerruntime.Result{Requeue: true}, err
} }
err = helper.RemoveOrphanWorks(c.Client, works) err = helper.RemoveOrphanWorks(c.Client, works)
if err != nil { if err != nil {
klog.Errorf("Failed to remove orphan works by resourceBinding %s/%s. Error: %v.", klog.Errorf("Failed to remove orphan works by resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err) binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err return controllerruntime.Result{Requeue: true}, err
} }
workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource) workload, err := helper.FetchWorkload(c.DynamicClient, c.RESTMapper, binding.Spec.Resource)
if err != nil { if err != nil {
klog.Errorf("Failed to fetch workload for resourceBinding %s/%s. Error: %v.", klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err) binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err return controllerruntime.Result{Requeue: true}, err
} }
err = helper.EnsureWork(c.Client, workload, clusterNames, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped) err = helper.EnsureWork(c.Client, workload, clusterNames, c.OverrideManager, binding, apiextensionsv1.NamespaceScoped)
if err != nil { if err != nil {
klog.Errorf("Failed to transform resourceBinding %s/%s to works. Error: %v.", klog.Errorf("Failed to transform resourceBinding(%s/%s) to works. Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err
}
err = helper.AggregateResourceBindingWorkStatus(c.Client, binding, workload)
if err != nil {
klog.Errorf("Failed to aggregate workStatuses to resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err) binding.GetNamespace(), binding.GetName(), err)
return controllerruntime.Result{Requeue: true}, err return controllerruntime.Result{Requeue: true}, err
} }
@ -96,5 +112,52 @@ func (c *ResourceBindingController) syncBinding(binding *workv1alpha1.ResourceBi
// SetupWithManager creates a controller and register to controller manager. // SetupWithManager creates a controller and register to controller manager.
func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error { func (c *ResourceBindingController) SetupWithManager(mgr controllerruntime.Manager) error {
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.ResourceBinding{}).Complete(c) workFn := handler.ToRequestsFunc(
func(a handler.MapObject) []reconcile.Request {
var requests []reconcile.Request
labels := a.Meta.GetLabels()
namespacesName := types.NamespacedName{
Namespace: labels[util.ResourceBindingNamespaceLabel],
Name: labels[util.ResourceBindingNameLabel],
}
requests = append(requests, reconcile.Request{NamespacedName: namespacesName})
return requests
})
predicateFn := builder.WithPredicates(predicate.Funcs{
CreateFunc: func(e event.CreateEvent) bool {
return false
},
UpdateFunc: func(e event.UpdateEvent) bool {
var statusesOld, statusesNew []workv1alpha1.ManifestStatus
switch oldWork := e.ObjectOld.(type) {
case *workv1alpha1.Work:
statusesOld = oldWork.Status.ManifestStatuses
default:
return false
}
switch newWork := e.ObjectNew.(type) {
case *workv1alpha1.Work:
statusesNew = newWork.Status.ManifestStatuses
default:
return false
}
return !reflect.DeepEqual(statusesOld, statusesNew)
},
DeleteFunc: func(event.DeleteEvent) bool {
return false
},
GenericFunc: func(event.GenericEvent) bool {
return false
},
})
return controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha1.ResourceBinding{}).
Watches(&source.Kind{Type: &workv1alpha1.Work{}}, &handler.EnqueueRequestsFromMapFunc{ToRequests: workFn}, predicateFn).
Complete(c)
} }

View File

@ -22,6 +22,7 @@ import (
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1" workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util" "github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/helper"
"github.com/karmada-io/karmada/pkg/util/informermanager" "github.com/karmada-io/karmada/pkg/util/informermanager"
"github.com/karmada-io/karmada/pkg/util/names" "github.com/karmada-io/karmada/pkg/util/names"
"github.com/karmada-io/karmada/pkg/util/objectwatcher" "github.com/karmada-io/karmada/pkg/util/objectwatcher"
@ -246,7 +247,7 @@ func (c *WorkStatusController) reflectStatus(work *workv1alpha1.Work, clusterObj
} }
func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) (*workv1alpha1.ResourceIdentifier, error) { func (c *WorkStatusController) buildStatusIdentifier(work *workv1alpha1.Work, clusterObj *unstructured.Unstructured) (*workv1alpha1.ResourceIdentifier, error) {
ordinal, err := c.getManifestIndex(work.Spec.Workload.Manifests, clusterObj) ordinal, err := helper.GetManifestIndex(work.Spec.Workload.Manifests, clusterObj)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -289,24 +290,6 @@ func (c *WorkStatusController) mergeStatus(statuses []workv1alpha1.ManifestStatu
return []workv1alpha1.ManifestStatus{newStatus} return []workv1alpha1.ManifestStatus{newStatus}
} }
func (c *WorkStatusController) getManifestIndex(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (int, error) {
for index, rawManifest := range manifests {
manifest := &unstructured.Unstructured{}
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
return -1, err
}
if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() &&
manifest.GetKind() == clusterObj.GetKind() &&
manifest.GetNamespace() == clusterObj.GetNamespace() &&
manifest.GetName() == clusterObj.GetName() {
return index, nil
}
}
return -1, fmt.Errorf("no such manifest exist")
}
func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) { func (c *WorkStatusController) getRawManifest(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (*unstructured.Unstructured, error) {
for _, rawManifest := range manifests { for _, rawManifest := range manifests {
manifest := &unstructured.Unstructured{} manifest := &unstructured.Unstructured{}

View File

@ -0,0 +1,129 @@
package helper
import (
"context"
"fmt"
"reflect"
apiextensionsv1 "k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/klog/v2"
"sigs.k8s.io/controller-runtime/pkg/client"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
"github.com/karmada-io/karmada/pkg/util"
"github.com/karmada-io/karmada/pkg/util/names"
)
// AggregateResourceBindingWorkStatus will collect all work statuses with current ResourceBinding objects,
// then aggregate status info to current ResourceBinding status.
func AggregateResourceBindingWorkStatus(c client.Client, binding *workv1alpha1.ResourceBinding, workload *unstructured.Unstructured) error {
aggregatedStatuses, err := assembleWorkStatus(c, binding.Namespace, binding.Name, workload, apiextensionsv1.NamespaceScoped)
if err != nil {
return err
}
if reflect.DeepEqual(binding.Status.AggregatedStatus, aggregatedStatuses) {
klog.Infof("New aggregatedStatuses are equal with old resourceBinding(%s/%s) AggregatedStatus, no update required.",
binding.Namespace, binding.Name)
return nil
}
binding.Status.AggregatedStatus = aggregatedStatuses
err = c.Status().Update(context.TODO(), binding)
if err != nil {
klog.Errorf("Failed update resourceBinding(%s/%s). Error: %v.", binding.Namespace, binding.Name, err)
return err
}
klog.Infof("Update resourceBinding(%s/%s) with AggregatedStatus successfully.", binding.Namespace, binding.Name)
return nil
}
func assembleWorkStatus(c client.Client, bindingNamespace, bindingName string, workload *unstructured.Unstructured,
scope apiextensionsv1.ResourceScope) ([]workv1alpha1.AggregatedStatusItem, error) {
workList := &workv1alpha1.WorkList{}
switch scope {
case apiextensionsv1.NamespaceScoped:
selector := labels.SelectorFromSet(labels.Set{
util.ResourceBindingNamespaceLabel: bindingNamespace,
util.ResourceBindingNameLabel: bindingName,
})
if err := c.List(context.TODO(), workList, &client.ListOptions{LabelSelector: selector}); err != nil {
return nil, err
}
}
statuses := make([]workv1alpha1.AggregatedStatusItem, 0)
for _, work := range workList.Items {
identifierIndex, err := GetManifestIndex(work.Spec.Workload.Manifests, workload)
if err != nil {
klog.Errorf("Failed to get manifestIndex of workload in work.Spec.Workload.Manifests. Error: %v.", err)
return nil, err
}
for _, manifestStatus := range work.Status.ManifestStatuses {
equal, err := equalIdentifier(&manifestStatus.Identifier, identifierIndex, workload)
if err != nil {
return nil, err
}
if equal {
clusterName, err := names.GetClusterName(work.Namespace)
if err != nil {
klog.Errorf("Failed to get clusterName from work namespace %s. Error: %v.", work.Namespace, err)
return nil, err
}
aggregatedStatus := workv1alpha1.AggregatedStatusItem{
ClusterName: clusterName,
Status: manifestStatus.Status,
}
statuses = append(statuses, aggregatedStatus)
break
}
}
}
return statuses, nil
}
// GetManifestIndex get the index of clusterObj in manifest list, if not exist return -1.
func GetManifestIndex(manifests []workv1alpha1.Manifest, clusterObj *unstructured.Unstructured) (int, error) {
for index, rawManifest := range manifests {
manifest := &unstructured.Unstructured{}
if err := manifest.UnmarshalJSON(rawManifest.Raw); err != nil {
return -1, err
}
if manifest.GetAPIVersion() == clusterObj.GetAPIVersion() &&
manifest.GetKind() == clusterObj.GetKind() &&
manifest.GetNamespace() == clusterObj.GetNamespace() &&
manifest.GetName() == clusterObj.GetName() {
return index, nil
}
}
return -1, fmt.Errorf("no such manifest exist")
}
func equalIdentifier(targetIdentifier *workv1alpha1.ResourceIdentifier, ordinal int, workload *unstructured.Unstructured) (bool, error) {
groupVersion, err := schema.ParseGroupVersion(workload.GetAPIVersion())
if err != nil {
return false, err
}
if targetIdentifier.Ordinal == ordinal &&
targetIdentifier.Group == groupVersion.Group &&
targetIdentifier.Version == groupVersion.Version &&
targetIdentifier.Kind == workload.GetKind() &&
targetIdentifier.Namespace == workload.GetNamespace() &&
targetIdentifier.Name == workload.GetName() {
return true, nil
}
return false, nil
}