Merge pull request #1301 from mrlihanbo/dependencies
implementation of automatically propagate dependencies
This commit is contained in:
commit
d6355ec852
|
@ -35,7 +35,9 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/controllers/namespace"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/status"
|
||||
"github.com/karmada-io/karmada/pkg/controllers/unifiedauth"
|
||||
"github.com/karmada-io/karmada/pkg/dependenciesdistributor"
|
||||
"github.com/karmada-io/karmada/pkg/detector"
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
"github.com/karmada-io/karmada/pkg/karmadactl"
|
||||
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
|
@ -395,6 +397,19 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
|
|||
klog.Fatalf("Failed to setup resource detector: %v", err)
|
||||
}
|
||||
|
||||
if features.FeatureGate.Enabled(features.PropagateDeps) {
|
||||
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
|
||||
Client: mgr.GetClient(),
|
||||
DynamicClient: dynamicClientSet,
|
||||
InformerManager: controlPlaneInformerManager,
|
||||
ResourceInterpreter: resourceInterpreter,
|
||||
RESTMapper: mgr.GetRESTMapper(),
|
||||
}
|
||||
if err := mgr.Add(dependenciesDistributor); err != nil {
|
||||
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
setupClusterAPIClusterDetector(mgr, opts, stopChan)
|
||||
controllerContext := controllerscontext.Context{
|
||||
Mgr: mgr,
|
||||
|
|
|
@ -10,6 +10,7 @@ import (
|
|||
"k8s.io/client-go/tools/leaderelection/resourcelock"
|
||||
componentbaseconfig "k8s.io/component-base/config"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
)
|
||||
|
||||
|
@ -157,4 +158,5 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) {
|
|||
flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.")
|
||||
flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.")
|
||||
flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.")
|
||||
features.FeatureGate.AddFlag(flags)
|
||||
}
|
||||
|
|
|
@ -88,7 +88,7 @@ func (c *ResourceBindingController) removeFinalizer(rb *workv1alpha2.ResourceBin
|
|||
|
||||
// syncBinding will sync resourceBinding to Works.
|
||||
func (c *ResourceBindingController) syncBinding(binding *workv1alpha2.ResourceBinding) (controllerruntime.Result, error) {
|
||||
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters)
|
||||
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
|
||||
works, err := helper.FindOrphanWorks(c.Client, binding.Namespace, binding.Name, clusterNames, apiextensionsv1.NamespaceScoped)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to find orphan works by resourceBinding(%s/%s). Error: %v.",
|
||||
|
|
|
@ -88,7 +88,7 @@ func (c *ClusterResourceBindingController) removeFinalizer(crb *workv1alpha2.Clu
|
|||
|
||||
// syncBinding will sync clusterResourceBinding to Works.
|
||||
func (c *ClusterResourceBindingController) syncBinding(binding *workv1alpha2.ClusterResourceBinding) (controllerruntime.Result, error) {
|
||||
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters)
|
||||
clusterNames := helper.GetBindingClusterNames(binding.Spec.Clusters, binding.Spec.RequiredBy)
|
||||
works, err := helper.FindOrphanWorks(c.Client, "", binding.Name, clusterNames, apiextensionsv1.ClusterScoped)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to find orphan works by ClusterResourceBinding(%s). Error: %v.", binding.GetName(), err)
|
||||
|
|
|
@ -59,15 +59,20 @@ func ensureWork(
|
|||
overrideManager overridemanager.OverrideManager, binding metav1.Object, scope apiextensionsv1.ResourceScope,
|
||||
) error {
|
||||
var targetClusters []workv1alpha2.TargetCluster
|
||||
var requiredByBindingSnapshot []workv1alpha2.BindingSnapshot
|
||||
switch scope {
|
||||
case apiextensionsv1.NamespaceScoped:
|
||||
bindingObj := binding.(*workv1alpha2.ResourceBinding)
|
||||
targetClusters = bindingObj.Spec.Clusters
|
||||
requiredByBindingSnapshot = bindingObj.Spec.RequiredBy
|
||||
case apiextensionsv1.ClusterScoped:
|
||||
bindingObj := binding.(*workv1alpha2.ClusterResourceBinding)
|
||||
targetClusters = bindingObj.Spec.Clusters
|
||||
requiredByBindingSnapshot = bindingObj.Spec.RequiredBy
|
||||
}
|
||||
|
||||
targetClusters = mergeTargetClusters(targetClusters, requiredByBindingSnapshot)
|
||||
|
||||
var jobCompletions []workv1alpha2.TargetCluster
|
||||
var err error
|
||||
if workload.GetKind() == util.JobKind {
|
||||
|
@ -140,6 +145,25 @@ func ensureWork(
|
|||
return nil
|
||||
}
|
||||
|
||||
func mergeTargetClusters(targetClusters []workv1alpha2.TargetCluster, requiredByBindingSnapshot []workv1alpha2.BindingSnapshot) []workv1alpha2.TargetCluster {
|
||||
if len(requiredByBindingSnapshot) == 0 {
|
||||
return targetClusters
|
||||
}
|
||||
|
||||
scheduledClusterNames := util.ConvertToClusterNames(targetClusters)
|
||||
|
||||
for _, requiredByBinding := range requiredByBindingSnapshot {
|
||||
for _, targetCluster := range requiredByBinding.Clusters {
|
||||
if !scheduledClusterNames.Has(targetCluster.Name) {
|
||||
scheduledClusterNames.Insert(targetCluster.Name)
|
||||
targetClusters = append(targetClusters, targetCluster)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return targetClusters
|
||||
}
|
||||
|
||||
func getReplicaInfos(targetClusters []workv1alpha2.TargetCluster) (bool, map[string]int64) {
|
||||
if helper.HasScheduledReplica(targetClusters) {
|
||||
return true, transScheduleResultToMap(targetClusters)
|
||||
|
|
|
@ -0,0 +1,633 @@
|
|||
package dependenciesdistributor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sort"
|
||||
"time"
|
||||
|
||||
corev1 "k8s.io/api/core/v1"
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
||||
"k8s.io/apimachinery/pkg/api/meta"
|
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
"k8s.io/apimachinery/pkg/labels"
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/runtime/schema"
|
||||
utilerrors "k8s.io/apimachinery/pkg/util/errors"
|
||||
"k8s.io/apimachinery/pkg/util/sets"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/dynamic"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/retry"
|
||||
"k8s.io/klog/v2"
|
||||
"sigs.k8s.io/controller-runtime/pkg/client"
|
||||
"sigs.k8s.io/controller-runtime/pkg/manager"
|
||||
|
||||
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
|
||||
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
||||
"github.com/karmada-io/karmada/pkg/detector"
|
||||
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
||||
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
|
||||
"github.com/karmada-io/karmada/pkg/util/names"
|
||||
)
|
||||
|
||||
const (
|
||||
// bindingDependedByLabelKeyPrefix is the prefix to a label key specifying an attached binding referred by which independent binding.
|
||||
// the key is in the label of an attached binding which should be unique, because resource like secret can be referred by multiple deployments.
|
||||
bindingDependedByLabelKeyPrefix = "resourcebinding.karmada.io/depended-by-"
|
||||
// bindingDependenciesAnnotationKey represents the key of dependencies data (json serialized)
|
||||
// in the annotations of an independent binding.
|
||||
bindingDependenciesAnnotationKey = "resourcebinding.karmada.io/dependencies"
|
||||
)
|
||||
|
||||
var supportedTypes = []schema.GroupVersionResource{
|
||||
corev1.SchemeGroupVersion.WithResource("configmaps"),
|
||||
corev1.SchemeGroupVersion.WithResource("secrets"),
|
||||
}
|
||||
|
||||
// DependenciesDistributor is to automatically propagate relevant resources.
|
||||
// Resource binding will be created when a resource(e.g. deployment) is matched by a propagation policy, we call it independent binding in DependenciesDistributor.
|
||||
// And when DependenciesDistributor works, it will create or update reference resource bindings of relevant resources(e.g. secret), which we call them attached bindings.
|
||||
type DependenciesDistributor struct {
|
||||
// Client is used to retrieve objects, it is often more convenient than lister.
|
||||
Client client.Client
|
||||
// DynamicClient used to fetch arbitrary resources.
|
||||
DynamicClient dynamic.Interface
|
||||
|
||||
InformerManager informermanager.SingleClusterInformerManager
|
||||
EventHandler cache.ResourceEventHandler
|
||||
Processor util.AsyncWorker
|
||||
RESTMapper meta.RESTMapper
|
||||
ResourceInterpreter resourceinterpreter.ResourceInterpreter
|
||||
|
||||
// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
|
||||
// a reconcile function to consume the items in queue.
|
||||
bindingReconcileWorker util.AsyncWorker
|
||||
resourceBindingLister cache.GenericLister
|
||||
|
||||
stopCh <-chan struct{}
|
||||
}
|
||||
|
||||
// Start runs the distributor, never stop until stopCh closed.
|
||||
func (d *DependenciesDistributor) Start(ctx context.Context) error {
|
||||
klog.Infof("Starting dependencies distributor.")
|
||||
d.stopCh = ctx.Done()
|
||||
|
||||
// setup binding reconcile worker
|
||||
d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", detector.ClusterWideKeyFunc, d.ReconcileResourceBinding)
|
||||
d.bindingReconcileWorker.Run(2, d.stopCh)
|
||||
|
||||
// watch and enqueue ResourceBinding changes.
|
||||
resourceBindingGVR := schema.GroupVersionResource{
|
||||
Group: workv1alpha2.GroupVersion.Group,
|
||||
Version: workv1alpha2.GroupVersion.Version,
|
||||
Resource: "resourcebindings",
|
||||
}
|
||||
|
||||
bindingHandler := informermanager.NewHandlerOnEvents(nil, d.OnResourceBindingUpdate, d.OnResourceBindingDelete)
|
||||
d.InformerManager.ForResource(resourceBindingGVR, bindingHandler)
|
||||
d.resourceBindingLister = d.InformerManager.Lister(resourceBindingGVR)
|
||||
|
||||
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
|
||||
d.Processor = util.NewAsyncWorker("resource detector", detector.ClusterWideKeyFunc, d.Reconcile)
|
||||
d.Processor.Run(2, d.stopCh)
|
||||
go d.discoverResources(30 * time.Second)
|
||||
|
||||
<-d.stopCh
|
||||
|
||||
klog.Infof("Stopped as stopCh closed.")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Check if our DependenciesDistributor implements necessary interfaces
|
||||
var _ manager.Runnable = &DependenciesDistributor{}
|
||||
var _ manager.LeaderElectionRunnable = &DependenciesDistributor{}
|
||||
|
||||
// NeedLeaderElection implements LeaderElectionRunnable interface.
|
||||
// So that the distributor could run in the leader election mode.
|
||||
func (d *DependenciesDistributor) NeedLeaderElection() bool {
|
||||
return true
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) discoverResources(period time.Duration) {
|
||||
wait.Until(func() {
|
||||
for _, gvr := range supportedTypes {
|
||||
if d.InformerManager.IsHandlerExist(gvr, d.EventHandler) {
|
||||
continue
|
||||
}
|
||||
klog.Infof("Setup informer for %s", gvr.String())
|
||||
d.InformerManager.ForResource(gvr, d.EventHandler)
|
||||
}
|
||||
d.InformerManager.Start()
|
||||
}, period, d.stopCh)
|
||||
}
|
||||
|
||||
// OnAdd handles object add event and push the object to queue.
|
||||
func (d *DependenciesDistributor) OnAdd(obj interface{}) {
|
||||
runtimeObj, ok := obj.(runtime.Object)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
d.Processor.Enqueue(runtimeObj)
|
||||
}
|
||||
|
||||
// OnUpdate handles object update event and push the object to queue.
|
||||
func (d *DependenciesDistributor) OnUpdate(oldObj, newObj interface{}) {
|
||||
d.OnAdd(newObj)
|
||||
}
|
||||
|
||||
// OnDelete handles object delete event and push the object to queue.
|
||||
func (d *DependenciesDistributor) OnDelete(obj interface{}) {
|
||||
d.OnAdd(obj)
|
||||
}
|
||||
|
||||
// Reconcile performs a full reconciliation for the object referred to by the key.
|
||||
// The key will be re-queued if an error is non-nil.
|
||||
func (d *DependenciesDistributor) Reconcile(key util.QueueKey) error {
|
||||
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
||||
if !ok {
|
||||
klog.Error("invalid key")
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
klog.Infof("DependenciesDistributor start to reconcile object: %s", clusterWideKey)
|
||||
|
||||
bindingObjectList, err := d.resourceBindingLister.ByNamespace(clusterWideKey.Namespace).List(labels.Everything())
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bindingList, err := convertObjectsToResourceBindings(bindingObjectList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var errs []error
|
||||
for _, binding := range bindingList {
|
||||
if !binding.DeletionTimestamp.IsZero() {
|
||||
continue
|
||||
}
|
||||
|
||||
matched, err := dependentObjectReferenceMatches(clusterWideKey, binding)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
} else if !matched {
|
||||
klog.V(4).Infof("no need to sync binding(%s/%s)", binding.Namespace, binding.Name)
|
||||
continue
|
||||
}
|
||||
|
||||
bindingKey, err := detector.ClusterWideKeyFunc(binding)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to generate cluster wide key for binding %s/%s: %v", binding.Namespace, binding.Name, err)
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
d.bindingReconcileWorker.Add(bindingKey)
|
||||
}
|
||||
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
// dependentObjectReferenceMatches tells if the given object is referred by current resource binding.
|
||||
func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) {
|
||||
dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey]
|
||||
if !exist {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
var dependenciesSlice []configv1alpha1.DependentObjectReference
|
||||
err := json.Unmarshal([]byte(dependencies), &dependenciesSlice)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
|
||||
if len(dependenciesSlice) == 0 {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
for _, dependence := range dependenciesSlice {
|
||||
if objectKey.Version == dependence.APIVersion &&
|
||||
objectKey.Kind == dependence.Kind &&
|
||||
objectKey.Namespace == dependence.Namespace &&
|
||||
objectKey.Name == dependence.Name {
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// OnResourceBindingUpdate handles object update event and push the object to queue.
|
||||
func (d *DependenciesDistributor) OnResourceBindingUpdate(oldObj, newObj interface{}) {
|
||||
unstructuredOldObj, ok := oldObj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
oldBindingObject, err := helper.ConvertToResourceBinding(unstructuredOldObj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
unstructuredNewObj, ok := newObj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
newBindingObject, err := helper.ConvertToResourceBinding(unstructuredNewObj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if oldBindingObject.Generation == newBindingObject.Generation {
|
||||
return
|
||||
}
|
||||
|
||||
// prevent newBindingObject from the queue if it's not in Scheduled condition
|
||||
if !helper.IsBindingScheduled(&newBindingObject.Status) {
|
||||
return
|
||||
}
|
||||
|
||||
// in case users set PropagateDeps field from "true" to "false"
|
||||
// in case users set PropagateDeps field from "false" to "true"
|
||||
if oldBindingObject.Spec.PropagateDeps || newBindingObject.Spec.PropagateDeps {
|
||||
key, err := detector.ClusterWideKeyFunc(newObj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.bindingReconcileWorker.Add(key)
|
||||
}
|
||||
}
|
||||
|
||||
// OnResourceBindingDelete handles object delete event and push the object to queue.
|
||||
func (d *DependenciesDistributor) OnResourceBindingDelete(obj interface{}) {
|
||||
unstructuredObj, ok := obj.(*unstructured.Unstructured)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
|
||||
bindingObject, err := helper.ConvertToResourceBinding(unstructuredObj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
if !bindingObject.Spec.PropagateDeps {
|
||||
return
|
||||
}
|
||||
|
||||
key, err := detector.ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
d.bindingReconcileWorker.Add(key)
|
||||
}
|
||||
|
||||
// ReconcileResourceBinding handles ResourceBinding object changes.
|
||||
func (d *DependenciesDistributor) ReconcileResourceBinding(key util.QueueKey) error {
|
||||
ckey, ok := key.(keys.ClusterWideKey)
|
||||
if !ok { // should not happen
|
||||
klog.Error("Found invalid key when reconciling resource binding.")
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
|
||||
unstructuredObj, err := d.resourceBindingLister.Get(ckey.NamespaceKey())
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
klog.V(4).Infof("ResourceBinding(%s) has been removed.", ckey.NamespaceKey())
|
||||
return d.handleResourceBindingDeletion(ckey)
|
||||
}
|
||||
klog.Errorf("Failed to get ResourceBinding(%s): %v", ckey.NamespaceKey(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
bindingObject, err := helper.ConvertToResourceBinding(unstructuredObj.(*unstructured.Unstructured))
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to convert ResourceBinding(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// in case users set PropagateDeps field from "true" to "false"
|
||||
if !bindingObject.Spec.PropagateDeps || !bindingObject.DeletionTimestamp.IsZero() {
|
||||
return d.handleResourceBindingDeletion(ckey)
|
||||
}
|
||||
|
||||
workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, bindingObject.Spec.Resource)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to fetch workload for resourceBinding(%s/%s). Error: %v.", bindingObject.Namespace, bindingObject.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
if !d.ResourceInterpreter.HookEnabled(workload.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretDependency) {
|
||||
return nil
|
||||
}
|
||||
|
||||
dependencies, err := d.ResourceInterpreter.GetDependencies(workload)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to customize dependencies for %s(%s), %v", workload.GroupVersionKind(), workload.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
return d.syncScheduleResultToAttachedBindings(bindingObject, dependencies)
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) handleResourceBindingDeletion(bindingKey keys.ClusterWideKey) error {
|
||||
attachedBindings, err := d.listAttachedBindings(bindingKey.Namespace, bindingKey.Name)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return d.removeScheduleResultFromAttachedBindings(bindingKey.Namespace, bindingKey.Name, attachedBindings)
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error {
|
||||
if err := d.recordDependenciesForIndependentBinding(binding, dependencies); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// remove orphan attached bindings
|
||||
orphanBindings, err := d.findOrphanAttachedResourceBindings(binding, dependencies)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to find orphan attached bindings for resourceBinding(%s/%s). Error: %v.",
|
||||
binding.GetNamespace(), binding.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
err = d.removeScheduleResultFromAttachedBindings(binding.Namespace, binding.Name, orphanBindings)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to remove orphan attached bindings by resourceBinding(%s/%s). Error: %v.",
|
||||
binding.GetNamespace(), binding.GetName(), err)
|
||||
return err
|
||||
}
|
||||
|
||||
// create or update attached bindings
|
||||
var errs []error
|
||||
for _, dependent := range dependencies {
|
||||
resource := workv1alpha2.ObjectReference{
|
||||
APIVersion: dependent.APIVersion,
|
||||
Kind: dependent.Kind,
|
||||
Namespace: dependent.Namespace,
|
||||
Name: dependent.Name,
|
||||
}
|
||||
|
||||
rawObject, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, resource)
|
||||
if err != nil {
|
||||
// do nothing if resource template not exist.
|
||||
if apierrors.IsNotFound(err) {
|
||||
continue
|
||||
}
|
||||
errs = append(errs, err)
|
||||
continue
|
||||
}
|
||||
|
||||
attachedBinding := buildAttachedBinding(binding, rawObject)
|
||||
if err := d.createOrUpdateAttachedBinding(attachedBinding); err != nil {
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error {
|
||||
sort.Slice(dependencies, func(i, j int) bool {
|
||||
if dependencies[i].APIVersion != dependencies[j].APIVersion {
|
||||
return dependencies[i].APIVersion < dependencies[j].APIVersion
|
||||
}
|
||||
if dependencies[i].Kind != dependencies[j].Kind {
|
||||
return dependencies[i].Kind < dependencies[j].Kind
|
||||
}
|
||||
if dependencies[i].Namespace != dependencies[j].Namespace {
|
||||
return dependencies[i].Namespace < dependencies[j].Namespace
|
||||
}
|
||||
if dependencies[i].Name != dependencies[j].Name {
|
||||
return dependencies[i].Name < dependencies[j].Name
|
||||
}
|
||||
return false
|
||||
})
|
||||
|
||||
dependenciesBytes, err := json.Marshal(dependencies)
|
||||
if err != nil {
|
||||
klog.Errorf("failed to marshal dependencies of binding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
objectAnnotation := binding.GetAnnotations()
|
||||
if objectAnnotation == nil {
|
||||
objectAnnotation = make(map[string]string, 1)
|
||||
}
|
||||
|
||||
// dependencies are not updated, no need to update annotation.
|
||||
if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == string(dependenciesBytes) {
|
||||
return nil
|
||||
}
|
||||
|
||||
objectAnnotation[bindingDependenciesAnnotationKey] = string(dependenciesBytes)
|
||||
|
||||
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
||||
binding.SetAnnotations(objectAnnotation)
|
||||
updateErr := d.Client.Update(context.TODO(), binding)
|
||||
if updateErr == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
updated := &workv1alpha2.ResourceBinding{}
|
||||
if err = d.Client.Get(context.TODO(), client.ObjectKey{Namespace: binding.Namespace, Name: binding.Name}, updated); err == nil {
|
||||
//make a copy, so we don't mutate the shared cache
|
||||
binding = updated.DeepCopy()
|
||||
} else {
|
||||
klog.Errorf("failed to get updated binding %s/%s: %v", binding.Namespace, binding.Name, err)
|
||||
}
|
||||
return updateErr
|
||||
})
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) findOrphanAttachedResourceBindings(independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) ([]*workv1alpha2.ResourceBinding, error) {
|
||||
attachedBindings, err := d.listAttachedBindings(independentBinding.Namespace, independentBinding.Name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
dependenciesSets := sets.NewString()
|
||||
for _, dependency := range dependencies {
|
||||
key := generateDependencyKey(dependency.Kind, dependency.APIVersion, dependency.Namespace, dependency.Name)
|
||||
dependenciesSets.Insert(key)
|
||||
}
|
||||
|
||||
var orphanAttachedBindings []*workv1alpha2.ResourceBinding
|
||||
for _, attachedBinding := range attachedBindings {
|
||||
key := generateDependencyKey(attachedBinding.Spec.Resource.Kind, attachedBinding.Spec.Resource.APIVersion, attachedBinding.Spec.Resource.Namespace, attachedBinding.Spec.Resource.Name)
|
||||
if !dependenciesSets.Has(key) {
|
||||
orphanAttachedBindings = append(orphanAttachedBindings, attachedBinding)
|
||||
}
|
||||
}
|
||||
return orphanAttachedBindings, nil
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) listAttachedBindings(bindingNamespace, bindingName string) ([]*workv1alpha2.ResourceBinding, error) {
|
||||
label := generateBindingDependedByLabel(bindingNamespace, bindingName)
|
||||
selector := labels.SelectorFromSet(label)
|
||||
|
||||
attachedBindingList, err := d.resourceBindingLister.List(selector)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return convertObjectsToResourceBindings(attachedBindingList)
|
||||
}
|
||||
|
||||
func generateBindingDependedByLabel(bindingNamespace, bindingName string) map[string]string {
|
||||
labelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName)
|
||||
labelValue := fmt.Sprintf(bindingNamespace + "_" + bindingName)
|
||||
return map[string]string{labelKey: labelValue}
|
||||
}
|
||||
|
||||
func generateBindingDependedByLabelKey(bindingNamespace, bindingName string) string {
|
||||
bindHashKey := names.GenerateBindingReferenceKey(bindingNamespace, bindingName)
|
||||
return fmt.Sprintf(bindingDependedByLabelKeyPrefix + bindHashKey)
|
||||
}
|
||||
|
||||
func generateDependencyKey(kind, apiVersion, name, namespace string) string {
|
||||
if len(namespace) == 0 {
|
||||
return kind + "-" + apiVersion + "-" + name
|
||||
}
|
||||
|
||||
return kind + "-" + apiVersion + "-" + namespace + "-" + name
|
||||
}
|
||||
|
||||
func convertObjectsToResourceBindings(bindingList []runtime.Object) ([]*workv1alpha2.ResourceBinding, error) {
|
||||
bindings := make([]*workv1alpha2.ResourceBinding, 0, len(bindingList))
|
||||
for _, obj := range bindingList {
|
||||
binding, err := helper.ConvertToResourceBinding(obj.(*unstructured.Unstructured))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to convert unstructured to typed object: %v", err)
|
||||
}
|
||||
bindings = append(bindings, binding)
|
||||
}
|
||||
return bindings, nil
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) removeScheduleResultFromAttachedBindings(bindingNamespace, bindingName string, attachedBindings []*workv1alpha2.ResourceBinding) error {
|
||||
if len(attachedBindings) == 0 {
|
||||
return nil
|
||||
}
|
||||
|
||||
bindingLabelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName)
|
||||
|
||||
var errs []error
|
||||
for index, binding := range attachedBindings {
|
||||
delete(attachedBindings[index].Labels, bindingLabelKey)
|
||||
updatedSnapshot := deleteBindingFromSnapshot(bindingNamespace, bindingName, attachedBindings[index].Spec.RequiredBy)
|
||||
attachedBindings[index].Spec.RequiredBy = updatedSnapshot
|
||||
if err := d.Client.Update(context.TODO(), attachedBindings[index]); err != nil {
|
||||
klog.Errorf("Failed to update binding(%s/%s): %v", binding.Namespace, binding.Name, err)
|
||||
errs = append(errs, err)
|
||||
}
|
||||
}
|
||||
|
||||
if len(errs) > 0 {
|
||||
return utilerrors.NewAggregate(errs)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstructured.Unstructured) *workv1alpha2.ResourceBinding {
|
||||
labels := generateBindingDependedByLabel(binding.Namespace, binding.Name)
|
||||
|
||||
var result []workv1alpha2.BindingSnapshot
|
||||
result = append(result, workv1alpha2.BindingSnapshot{
|
||||
Namespace: binding.Namespace,
|
||||
Name: binding.Name,
|
||||
Clusters: binding.Spec.Clusters,
|
||||
})
|
||||
|
||||
return &workv1alpha2.ResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
Name: names.GenerateBindingName(object.GetKind(), object.GetName()),
|
||||
Namespace: binding.GetNamespace(),
|
||||
OwnerReferences: []metav1.OwnerReference{
|
||||
*metav1.NewControllerRef(object, object.GroupVersionKind()),
|
||||
},
|
||||
Labels: labels,
|
||||
Finalizers: []string{util.BindingControllerFinalizer},
|
||||
},
|
||||
Spec: workv1alpha2.ResourceBindingSpec{
|
||||
Resource: workv1alpha2.ObjectReference{
|
||||
APIVersion: object.GetAPIVersion(),
|
||||
Kind: object.GetKind(),
|
||||
Namespace: object.GetNamespace(),
|
||||
Name: object.GetName(),
|
||||
ResourceVersion: object.GetResourceVersion(),
|
||||
},
|
||||
RequiredBy: result,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding *workv1alpha2.ResourceBinding) error {
|
||||
if err := d.Client.Create(context.TODO(), attachedBinding); err != nil {
|
||||
if !apierrors.IsAlreadyExists(err) {
|
||||
klog.Infof("failed to create resource binding(%s/%s): %v", attachedBinding.Namespace, attachedBinding.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
existBinding := &workv1alpha2.ResourceBinding{}
|
||||
key := client.ObjectKeyFromObject(attachedBinding)
|
||||
if err := d.Client.Get(context.TODO(), key, existBinding); err != nil {
|
||||
klog.Infof("failed to get resource binding(%s/%s): %v", attachedBinding.Namespace, attachedBinding.Name, err)
|
||||
return err
|
||||
}
|
||||
|
||||
updatedBindingSnapshot := mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy)
|
||||
existBinding.Spec.RequiredBy = updatedBindingSnapshot
|
||||
existBinding.Labels = util.DedupeAndMergeLabels(existBinding.Labels, attachedBinding.Labels)
|
||||
|
||||
if err := d.Client.Update(context.TODO(), existBinding); err != nil {
|
||||
klog.Errorf("failed to update resource binding(%s/%s): %v", existBinding.Namespace, existBinding.Name, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func mergeBindingSnapshot(existSnapshot, newSnapshot []workv1alpha2.BindingSnapshot) []workv1alpha2.BindingSnapshot {
|
||||
if len(existSnapshot) == 0 {
|
||||
return newSnapshot
|
||||
}
|
||||
|
||||
for _, newBinding := range newSnapshot {
|
||||
existInOldSnapshot := false
|
||||
for i := range existSnapshot {
|
||||
if existSnapshot[i].Namespace == newBinding.Namespace &&
|
||||
existSnapshot[i].Name == newBinding.Name {
|
||||
existSnapshot[i].Clusters = newBinding.Clusters
|
||||
existInOldSnapshot = true
|
||||
}
|
||||
}
|
||||
if !existInOldSnapshot {
|
||||
existSnapshot = append(existSnapshot, newBinding)
|
||||
}
|
||||
}
|
||||
|
||||
return existSnapshot
|
||||
}
|
||||
|
||||
func deleteBindingFromSnapshot(bindingNamespace, bindingName string, existSnapshot []workv1alpha2.BindingSnapshot) []workv1alpha2.BindingSnapshot {
|
||||
for i := 0; i < len(existSnapshot); i++ {
|
||||
if existSnapshot[i].Namespace == bindingNamespace &&
|
||||
existSnapshot[i].Name == bindingName {
|
||||
existSnapshot = append(existSnapshot[:i], existSnapshot[i+1:]...)
|
||||
i--
|
||||
}
|
||||
}
|
||||
return existSnapshot
|
||||
}
|
|
@ -447,7 +447,7 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
|
|||
policyv1alpha1.PropagationPolicyNameLabel: policy.GetName(),
|
||||
}
|
||||
|
||||
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels)
|
||||
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
|
||||
return err
|
||||
|
@ -457,12 +457,13 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
|
|||
err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
||||
operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
|
||||
// Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
|
||||
bindingCopy.Labels = binding.Labels
|
||||
bindingCopy.Labels = util.DedupeAndMergeLabels(bindingCopy.Labels, binding.Labels)
|
||||
bindingCopy.OwnerReferences = binding.OwnerReferences
|
||||
bindingCopy.Finalizers = binding.Finalizers
|
||||
bindingCopy.Spec.Resource = binding.Spec.Resource
|
||||
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
|
||||
bindingCopy.Spec.Replicas = binding.Spec.Replicas
|
||||
bindingCopy.Spec.PropagateDeps = binding.Spec.PropagateDeps
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
|
@ -510,7 +511,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
|
|||
// For namespace-scoped resources, which namespace is not empty, building `ResourceBinding`.
|
||||
// For cluster-scoped resources, which namespace is empty, building `ClusterResourceBinding`.
|
||||
if object.GetNamespace() != "" {
|
||||
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels)
|
||||
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
|
||||
return err
|
||||
|
@ -547,7 +548,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
|
|||
klog.V(2).Infof("ResourceBinding(%s) is up to date.", binding.GetName())
|
||||
}
|
||||
} else {
|
||||
binding, err := d.BuildClusterResourceBinding(object, objectKey, policyLabels)
|
||||
binding, err := d.BuildClusterResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to build clusterResourceBinding for object: %s. error: %v", objectKey, err)
|
||||
return err
|
||||
|
@ -654,7 +655,7 @@ func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unst
|
|||
}
|
||||
|
||||
// BuildResourceBinding builds a desired ResourceBinding for object.
|
||||
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ResourceBinding, error) {
|
||||
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ResourceBinding, error) {
|
||||
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
|
||||
propagationBinding := &workv1alpha2.ResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -667,6 +668,7 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
|
|||
Finalizers: []string{util.BindingControllerFinalizer},
|
||||
},
|
||||
Spec: workv1alpha2.ResourceBindingSpec{
|
||||
PropagateDeps: propagateDeps,
|
||||
Resource: workv1alpha2.ObjectReference{
|
||||
APIVersion: object.GetAPIVersion(),
|
||||
Kind: object.GetKind(),
|
||||
|
@ -692,7 +694,7 @@ func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructure
|
|||
}
|
||||
|
||||
// BuildClusterResourceBinding builds a desired ClusterResourceBinding for object.
|
||||
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string) (*workv1alpha2.ClusterResourceBinding, error) {
|
||||
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ClusterResourceBinding, error) {
|
||||
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
|
||||
binding := &workv1alpha2.ClusterResourceBinding{
|
||||
ObjectMeta: metav1.ObjectMeta{
|
||||
|
@ -704,6 +706,7 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
|
|||
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
|
||||
},
|
||||
Spec: workv1alpha2.ResourceBindingSpec{
|
||||
PropagateDeps: propagateDeps,
|
||||
Resource: workv1alpha2.ObjectReference{
|
||||
APIVersion: object.GetAPIVersion(),
|
||||
Kind: object.GetKind(),
|
||||
|
|
|
@ -8,6 +8,8 @@ import (
|
|||
const (
|
||||
// Failover indicates if scheduler should reschedule on cluster failure.
|
||||
Failover featuregate.Feature = "Failover"
|
||||
// PropagateDeps indicates if relevant resources should be propagated automatically
|
||||
PropagateDeps featuregate.Feature = "PropagateDeps"
|
||||
)
|
||||
|
||||
var (
|
||||
|
@ -15,7 +17,8 @@ var (
|
|||
FeatureGate featuregate.MutableFeatureGate = featuregate.NewFeatureGate()
|
||||
|
||||
defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
|
||||
Failover: {Default: false, PreRelease: featuregate.Alpha},
|
||||
Failover: {Default: false, PreRelease: featuregate.Alpha},
|
||||
PropagateDeps: {Default: false, PreRelease: featuregate.Alpha},
|
||||
}
|
||||
)
|
||||
|
||||
|
|
|
@ -15,6 +15,7 @@ import (
|
|||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
"github.com/karmada-io/karmada/pkg/features"
|
||||
"github.com/karmada-io/karmada/pkg/scheduler/metrics"
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/gclient"
|
||||
)
|
||||
|
||||
|
@ -22,9 +23,12 @@ import (
|
|||
// to add event handlers for various informers.
|
||||
func (s *Scheduler) addAllEventHandlers() {
|
||||
bindingInformer := s.informerFactory.Work().V1alpha2().ResourceBindings().Informer()
|
||||
bindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.onResourceBindingAdd,
|
||||
UpdateFunc: s.onResourceBindingUpdate,
|
||||
bindingInformer.AddEventHandler(cache.FilteringResourceEventHandler{
|
||||
FilterFunc: s.resourceBindingEventFilter,
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.onResourceBindingAdd,
|
||||
UpdateFunc: s.onResourceBindingUpdate,
|
||||
},
|
||||
})
|
||||
|
||||
policyInformer := s.informerFactory.Policy().V1alpha1().PropagationPolicies().Informer()
|
||||
|
@ -33,9 +37,12 @@ func (s *Scheduler) addAllEventHandlers() {
|
|||
})
|
||||
|
||||
clusterBindingInformer := s.informerFactory.Work().V1alpha2().ClusterResourceBindings().Informer()
|
||||
clusterBindingInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.onResourceBindingAdd,
|
||||
UpdateFunc: s.onResourceBindingUpdate,
|
||||
clusterBindingInformer.AddEventHandler(cache.FilteringResourceEventHandler{
|
||||
FilterFunc: s.resourceBindingEventFilter,
|
||||
Handler: cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: s.onResourceBindingAdd,
|
||||
UpdateFunc: s.onResourceBindingUpdate,
|
||||
},
|
||||
})
|
||||
|
||||
clusterPolicyInformer := s.informerFactory.Policy().V1alpha1().ClusterPropagationPolicies().Informer()
|
||||
|
@ -58,6 +65,16 @@ func (s *Scheduler) addAllEventHandlers() {
|
|||
s.eventRecorder = eventBroadcaster.NewRecorder(gclient.NewSchema(), corev1.EventSource{Component: "karmada-scheduler"})
|
||||
}
|
||||
|
||||
func (s *Scheduler) resourceBindingEventFilter(obj interface{}) bool {
|
||||
accessor, err := meta.Accessor(obj)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
return util.GetLabelValue(accessor.GetLabels(), policyv1alpha1.PropagationPolicyNameLabel) != "" ||
|
||||
util.GetLabelValue(accessor.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel) != ""
|
||||
}
|
||||
|
||||
func (s *Scheduler) onResourceBindingAdd(obj interface{}) {
|
||||
key, err := cache.MetaNamespaceKeyFunc(obj)
|
||||
if err != nil {
|
||||
|
|
|
@ -72,13 +72,16 @@ func HasScheduledReplica(scheduleResult []workv1alpha2.TargetCluster) bool {
|
|||
return false
|
||||
}
|
||||
|
||||
// GetBindingClusterNames will get clusterName list from bind clusters field
|
||||
func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster) []string {
|
||||
var clusterNames []string
|
||||
for _, targetCluster := range targetClusters {
|
||||
clusterNames = append(clusterNames, targetCluster.Name)
|
||||
// GetBindingClusterNames will get clusterName list from bind clusters field and requiredBy field.
|
||||
func GetBindingClusterNames(targetClusters []workv1alpha2.TargetCluster, bindingSnapshot []workv1alpha2.BindingSnapshot) []string {
|
||||
clusterNames := util.ConvertToClusterNames(targetClusters)
|
||||
for _, binding := range bindingSnapshot {
|
||||
for _, targetCluster := range binding.Clusters {
|
||||
clusterNames.Insert(targetCluster.Name)
|
||||
}
|
||||
}
|
||||
return clusterNames
|
||||
|
||||
return clusterNames.List()
|
||||
}
|
||||
|
||||
// FindOrphanWorks retrieves all works that labeled with current binding(ResourceBinding or ClusterResourceBinding) objects,
|
||||
|
|
|
@ -53,7 +53,7 @@ func AggregateResourceBindingWorkStatus(c client.Client, binding *workv1alpha2.R
|
|||
|
||||
currentBindingStatus := binding.Status.DeepCopy()
|
||||
currentBindingStatus.AggregatedStatus = aggregatedStatuses
|
||||
meta.SetStatusCondition(¤tBindingStatus.Conditions, generateFullyAppliedCondition(binding.Spec.Clusters, aggregatedStatuses))
|
||||
meta.SetStatusCondition(¤tBindingStatus.Conditions, generateFullyAppliedCondition(binding.Spec, aggregatedStatuses))
|
||||
|
||||
if reflect.DeepEqual(binding.Status, currentBindingStatus) {
|
||||
klog.V(4).Infof("New aggregatedStatuses are equal with old resourceBinding(%s/%s) AggregatedStatus, no update required.",
|
||||
|
@ -100,7 +100,7 @@ func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1a
|
|||
|
||||
currentBindingStatus := binding.Status.DeepCopy()
|
||||
currentBindingStatus.AggregatedStatus = aggregatedStatuses
|
||||
meta.SetStatusCondition(¤tBindingStatus.Conditions, generateFullyAppliedCondition(binding.Spec.Clusters, aggregatedStatuses))
|
||||
meta.SetStatusCondition(¤tBindingStatus.Conditions, generateFullyAppliedCondition(binding.Spec, aggregatedStatuses))
|
||||
|
||||
if reflect.DeepEqual(binding.Status, currentBindingStatus) {
|
||||
klog.Infof("New aggregatedStatuses are equal with old clusterResourceBinding(%s) AggregatedStatus, no update required.", binding.Name)
|
||||
|
@ -126,8 +126,9 @@ func AggregateClusterResourceBindingWorkStatus(c client.Client, binding *workv1a
|
|||
})
|
||||
}
|
||||
|
||||
func generateFullyAppliedCondition(targetClusters []workv1alpha2.TargetCluster, aggregatedStatuses []workv1alpha2.AggregatedStatusItem) metav1.Condition {
|
||||
if len(targetClusters) == len(aggregatedStatuses) && areWorksFullyApplied(aggregatedStatuses) {
|
||||
func generateFullyAppliedCondition(spec workv1alpha2.ResourceBindingSpec, aggregatedStatuses []workv1alpha2.AggregatedStatusItem) metav1.Condition {
|
||||
clusterNames := GetBindingClusterNames(spec.Clusters, spec.RequiredBy)
|
||||
if len(clusterNames) == len(aggregatedStatuses) && areWorksFullyApplied(aggregatedStatuses) {
|
||||
return util.NewCondition(workv1alpha2.FullyApplied, FullyAppliedSuccessReason, FullyAppliedSuccessMessage, metav1.ConditionTrue)
|
||||
}
|
||||
return util.NewCondition(workv1alpha2.FullyApplied, FullyAppliedFailedReason, FullyAppliedFailedMessage, metav1.ConditionFalse)
|
||||
|
|
|
@ -22,3 +22,15 @@ func MergeLabel(obj *unstructured.Unstructured, labelKey string, labelValue stri
|
|||
workloadLabel[labelKey] = labelValue
|
||||
obj.SetLabels(workloadLabel)
|
||||
}
|
||||
|
||||
// DedupeAndMergeLabels merges the new labels into exist labels.
|
||||
func DedupeAndMergeLabels(existLabel, newLabel map[string]string) map[string]string {
|
||||
if existLabel == nil {
|
||||
return newLabel
|
||||
}
|
||||
|
||||
for k, v := range newLabel {
|
||||
existLabel[k] = v
|
||||
}
|
||||
return existLabel
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue