1055 lines
42 KiB
Go
1055 lines
42 KiB
Go
package detector
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
|
|
corev1 "k8s.io/api/core/v1"
|
|
apierrors "k8s.io/apimachinery/pkg/api/errors"
|
|
"k8s.io/apimachinery/pkg/api/meta"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
|
"k8s.io/apimachinery/pkg/labels"
|
|
"k8s.io/apimachinery/pkg/runtime"
|
|
"k8s.io/apimachinery/pkg/runtime/schema"
|
|
"k8s.io/apimachinery/pkg/util/errors"
|
|
"k8s.io/apimachinery/pkg/util/wait"
|
|
"k8s.io/client-go/discovery"
|
|
"k8s.io/client-go/dynamic"
|
|
"k8s.io/client-go/tools/cache"
|
|
"k8s.io/client-go/tools/record"
|
|
"k8s.io/client-go/util/retry"
|
|
"k8s.io/klog/v2"
|
|
"sigs.k8s.io/controller-runtime/pkg/client"
|
|
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
|
|
"sigs.k8s.io/controller-runtime/pkg/manager"
|
|
|
|
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
|
|
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
|
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
|
|
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
|
|
"github.com/karmada-io/karmada/pkg/util"
|
|
"github.com/karmada-io/karmada/pkg/util/helper"
|
|
"github.com/karmada-io/karmada/pkg/util/informermanager"
|
|
"github.com/karmada-io/karmada/pkg/util/informermanager/keys"
|
|
"github.com/karmada-io/karmada/pkg/util/lifted"
|
|
"github.com/karmada-io/karmada/pkg/util/names"
|
|
"github.com/karmada-io/karmada/pkg/util/ratelimiter"
|
|
"github.com/karmada-io/karmada/pkg/util/restmapper"
|
|
)
|
|
|
|
// ResourceDetector is a resource watcher which watches all resources and reconcile the events.
|
|
type ResourceDetector struct {
|
|
// DiscoveryClientSet is used to resource discovery.
|
|
DiscoveryClientSet *discovery.DiscoveryClient
|
|
// Client is used to retrieve objects, it is often more convenient than lister.
|
|
Client client.Client
|
|
// DynamicClient used to fetch arbitrary resources.
|
|
DynamicClient dynamic.Interface
|
|
InformerManager informermanager.SingleClusterInformerManager
|
|
EventHandler cache.ResourceEventHandler
|
|
Processor util.AsyncWorker
|
|
SkippedResourceConfig *util.SkippedResourceConfig
|
|
SkippedPropagatingNamespaces map[string]struct{}
|
|
// ResourceInterpreter knows the details of resource structure.
|
|
ResourceInterpreter resourceinterpreter.ResourceInterpreter
|
|
EventRecorder record.EventRecorder
|
|
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
|
|
// a reconcile function to consume the items in queue.
|
|
policyReconcileWorker util.AsyncWorker
|
|
propagationPolicyLister cache.GenericLister
|
|
|
|
// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
|
|
// a reconcile function to consume the items in queue.
|
|
clusterPolicyReconcileWorker util.AsyncWorker
|
|
clusterPropagationPolicyLister cache.GenericLister
|
|
|
|
RESTMapper meta.RESTMapper
|
|
|
|
// waitingObjects tracks of objects which haven't be propagated yet as lack of appropriate policies.
|
|
waitingObjects map[keys.ClusterWideKey]struct{}
|
|
// waitingLock is the lock for waitingObjects operation.
|
|
waitingLock sync.RWMutex
|
|
// ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently.
|
|
// Larger number means responsive resource template syncing but more CPU(and network) load.
|
|
ConcurrentResourceTemplateSyncs int
|
|
// ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently.
|
|
// Larger number means responsive resource template syncing but more CPU(and network) load.
|
|
ConcurrentResourceBindingSyncs int
|
|
|
|
// RateLimiterOptions is the configuration for rate limiter which may significantly influence the performance of
|
|
// the controller.
|
|
RateLimiterOptions ratelimiter.Options
|
|
|
|
stopCh <-chan struct{}
|
|
}
|
|
|
|
// Start runs the detector, never stop until stopCh closed.
|
|
func (d *ResourceDetector) Start(ctx context.Context) error {
|
|
klog.Infof("Starting resource detector.")
|
|
d.waitingObjects = make(map[keys.ClusterWideKey]struct{})
|
|
d.stopCh = ctx.Done()
|
|
|
|
// setup policy reconcile worker
|
|
policyWorkerOptions := util.Options{
|
|
Name: "propagationPolicy reconciler",
|
|
KeyFunc: ClusterWideKeyFunc,
|
|
ReconcileFunc: d.ReconcilePropagationPolicy,
|
|
}
|
|
d.policyReconcileWorker = util.NewAsyncWorker(policyWorkerOptions)
|
|
d.policyReconcileWorker.Run(1, d.stopCh)
|
|
clusterPolicyWorkerOptions := util.Options{
|
|
Name: "clusterPropagationPolicy reconciler",
|
|
KeyFunc: ClusterWideKeyFunc,
|
|
ReconcileFunc: d.ReconcileClusterPropagationPolicy,
|
|
}
|
|
d.clusterPolicyReconcileWorker = util.NewAsyncWorker(clusterPolicyWorkerOptions)
|
|
d.clusterPolicyReconcileWorker.Run(1, d.stopCh)
|
|
|
|
// watch and enqueue PropagationPolicy changes.
|
|
propagationPolicyGVR := schema.GroupVersionResource{
|
|
Group: policyv1alpha1.GroupVersion.Group,
|
|
Version: policyv1alpha1.GroupVersion.Version,
|
|
Resource: "propagationpolicies",
|
|
}
|
|
policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete)
|
|
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
|
|
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
|
|
|
|
// watch and enqueue ClusterPropagationPolicy changes.
|
|
clusterPropagationPolicyGVR := schema.GroupVersionResource{
|
|
Group: policyv1alpha1.GroupVersion.Group,
|
|
Version: policyv1alpha1.GroupVersion.Version,
|
|
Resource: "clusterpropagationpolicies",
|
|
}
|
|
clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete)
|
|
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
|
|
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)
|
|
|
|
detectorWorkerOptions := util.Options{
|
|
Name: "resource detector",
|
|
KeyFunc: ClusterWideKeyFunc,
|
|
ReconcileFunc: d.Reconcile,
|
|
RateLimiterOptions: d.RateLimiterOptions,
|
|
}
|
|
|
|
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
|
|
d.Processor = util.NewAsyncWorker(detectorWorkerOptions)
|
|
d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh)
|
|
go d.discoverResources(30 * time.Second)
|
|
|
|
<-d.stopCh
|
|
klog.Infof("Stopped as stopCh closed.")
|
|
return nil
|
|
}
|
|
|
|
// Check if our ResourceDetector implements necessary interfaces
|
|
var (
|
|
_ manager.Runnable = &ResourceDetector{}
|
|
_ manager.LeaderElectionRunnable = &ResourceDetector{}
|
|
)
|
|
|
|
func (d *ResourceDetector) discoverResources(period time.Duration) {
|
|
wait.Until(func() {
|
|
newResources := lifted.GetDeletableResources(d.DiscoveryClientSet)
|
|
for r := range newResources {
|
|
if d.InformerManager.IsHandlerExist(r, d.EventHandler) || d.gvrDisabled(r) {
|
|
continue
|
|
}
|
|
klog.Infof("Setup informer for %s", r.String())
|
|
d.InformerManager.ForResource(r, d.EventHandler)
|
|
}
|
|
d.InformerManager.Start()
|
|
}, period, d.stopCh)
|
|
}
|
|
|
|
// gvrDisabled returns whether GroupVersionResource is disabled.
|
|
func (d *ResourceDetector) gvrDisabled(gvr schema.GroupVersionResource) bool {
|
|
if d.SkippedResourceConfig == nil {
|
|
return false
|
|
}
|
|
|
|
if d.SkippedResourceConfig.GroupVersionDisabled(gvr.GroupVersion()) {
|
|
return true
|
|
}
|
|
if d.SkippedResourceConfig.GroupDisabled(gvr.Group) {
|
|
return true
|
|
}
|
|
|
|
gvks, err := d.RESTMapper.KindsFor(gvr)
|
|
if err != nil {
|
|
klog.Errorf("gvr(%s) transform failed: %v", gvr.String(), err)
|
|
return false
|
|
}
|
|
|
|
for _, gvk := range gvks {
|
|
if d.SkippedResourceConfig.GroupVersionKindDisabled(gvk) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// NeedLeaderElection implements LeaderElectionRunnable interface.
|
|
// So that the detector could run in the leader election mode.
|
|
func (d *ResourceDetector) NeedLeaderElection() bool {
|
|
return true
|
|
}
|
|
|
|
// Reconcile performs a full reconciliation for the object referred to by the key.
|
|
// The key will be re-queued if an error is non-nil.
|
|
func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
|
if !ok {
|
|
klog.Error("invalid key")
|
|
return fmt.Errorf("invalid key")
|
|
}
|
|
klog.Infof("Reconciling object: %s", clusterWideKey)
|
|
|
|
object, err := d.GetUnstructuredObject(clusterWideKey)
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
// The resource may no longer exist, in which case we try (may not exist in waiting list) remove it from waiting list and stop processing.
|
|
d.RemoveWaiting(clusterWideKey)
|
|
|
|
// Once resource be deleted, the derived ResourceBinding or ClusterResourceBinding also need to be cleaned up,
|
|
// currently we do that by setting owner reference to derived objects.
|
|
return nil
|
|
}
|
|
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKey, err)
|
|
return err
|
|
}
|
|
|
|
// first attempts to match policy in it's namespace.
|
|
propagationPolicy, err := d.LookForMatchedPolicy(object, clusterWideKey)
|
|
if err != nil {
|
|
klog.Errorf("Failed to retrieve policy for object: %s, error: %v", clusterWideKey.String(), err)
|
|
return err
|
|
}
|
|
if propagationPolicy != nil {
|
|
// return err when dependents not present, that we can retry at next reconcile.
|
|
if present, err := helper.IsDependentOverridesPresent(d.Client, propagationPolicy); err != nil || !present {
|
|
klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", propagationPolicy.Namespace, propagationPolicy.Name)
|
|
return fmt.Errorf("waiting for dependent overrides")
|
|
}
|
|
d.RemoveWaiting(clusterWideKey)
|
|
return d.ApplyPolicy(object, clusterWideKey, propagationPolicy)
|
|
}
|
|
|
|
// reaching here means there is no appropriate PropagationPolicy, attempts to match a ClusterPropagationPolicy.
|
|
clusterPolicy, err := d.LookForMatchedClusterPolicy(object, clusterWideKey)
|
|
if err != nil {
|
|
klog.Errorf("Failed to retrieve cluster policy for object: %s, error: %v", clusterWideKey.String(), err)
|
|
return err
|
|
}
|
|
if clusterPolicy != nil {
|
|
d.RemoveWaiting(clusterWideKey)
|
|
return d.ApplyClusterPolicy(object, clusterWideKey, clusterPolicy)
|
|
}
|
|
|
|
if d.isWaiting(clusterWideKey) {
|
|
// reaching here means there is no appropriate policy for the object
|
|
d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource")
|
|
return nil
|
|
}
|
|
|
|
// put it into waiting list and retry once in case the resource and propagation policy come at the same time
|
|
// see https://github.com/karmada-io/karmada/issues/1195
|
|
d.AddWaiting(clusterWideKey)
|
|
return fmt.Errorf("no matched propagation policy")
|
|
}
|
|
|
|
// EventFilter tells if an object should be take care of.
|
|
//
|
|
// All objects under Kubernetes reserved namespace should be ignored:
|
|
// - kube-system
|
|
// - kube-public
|
|
// - kube-node-lease
|
|
// All objects under Karmada reserved namespace should be ignored:
|
|
// - karmada-system
|
|
// - karmada-cluster
|
|
// - karmada-es-*
|
|
// All objects which API group defined by Karmada should be ignored:
|
|
// - cluster.karmada.io
|
|
// - policy.karmada.io
|
|
//
|
|
// The api objects listed above will be ignored by default, as we don't want users to manually input the things
|
|
// they don't care when trying to skip something else.
|
|
//
|
|
// If '--skipped-propagating-apis' which used to specific the APIs should be ignored in addition to the defaults, is set,
|
|
// the specified apis will be ignored as well.
|
|
//
|
|
// If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored.
|
|
func (d *ResourceDetector) EventFilter(obj interface{}) bool {
|
|
key, err := ClusterWideKeyFunc(obj)
|
|
if err != nil {
|
|
return false
|
|
}
|
|
|
|
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
|
if !ok {
|
|
klog.Errorf("Invalid key")
|
|
return false
|
|
}
|
|
|
|
if names.IsReservedNamespace(clusterWideKey.Namespace) {
|
|
return false
|
|
}
|
|
|
|
if d.SkippedResourceConfig != nil {
|
|
if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) {
|
|
klog.V(4).Infof("Skip event for %s", clusterWideKey.Group)
|
|
return false
|
|
}
|
|
if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) {
|
|
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion())
|
|
return false
|
|
}
|
|
if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) {
|
|
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind())
|
|
return false
|
|
}
|
|
}
|
|
// if SkippedPropagatingNamespaces is set, skip object events in these namespaces.
|
|
if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
// OnAdd handles object add event and push the object to queue.
|
|
func (d *ResourceDetector) OnAdd(obj interface{}) {
|
|
runtimeObj, ok := obj.(runtime.Object)
|
|
if !ok {
|
|
return
|
|
}
|
|
d.Processor.Enqueue(runtimeObj)
|
|
}
|
|
|
|
// OnUpdate handles object update event and push the object to queue.
|
|
func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
|
|
d.OnAdd(newObj)
|
|
}
|
|
|
|
// OnDelete handles object delete event and push the object to queue.
|
|
func (d *ResourceDetector) OnDelete(obj interface{}) {
|
|
d.OnAdd(obj)
|
|
}
|
|
|
|
// LookForMatchedPolicy tries to find a policy for object referenced by object key.
|
|
func (d *ResourceDetector) LookForMatchedPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey) (*policyv1alpha1.PropagationPolicy, error) {
|
|
if len(objectKey.Namespace) == 0 {
|
|
return nil, nil
|
|
}
|
|
|
|
klog.V(2).Infof("attempts to match policy for resource(%s)", objectKey)
|
|
policyObjects, err := d.propagationPolicyLister.ByNamespace(objectKey.Namespace).List(labels.Everything())
|
|
if err != nil {
|
|
klog.Errorf("Failed to list propagation policy: %v", err)
|
|
return nil, err
|
|
}
|
|
if len(policyObjects) == 0 {
|
|
klog.V(2).Infof("no propagationpolicy find in namespace(%s).", objectKey.Namespace)
|
|
return nil, nil
|
|
}
|
|
|
|
policyList := make([]*policyv1alpha1.PropagationPolicy, 0)
|
|
for index := range policyObjects {
|
|
policy, err := helper.ConvertToPropagationPolicy(policyObjects[index].(*unstructured.Unstructured))
|
|
if err != nil {
|
|
klog.Errorf("Failed to convert PropagationPolicy from unstructured object: %v", err)
|
|
return nil, err
|
|
}
|
|
policyList = append(policyList, policy)
|
|
}
|
|
|
|
var matchedPolicy *policyv1alpha1.PropagationPolicy
|
|
for _, policy := range policyList {
|
|
if util.ResourceMatchSelectors(object, policy.Spec.ResourceSelectors...) {
|
|
matchedPolicy = GetHigherPriorityPropagationPolicy(matchedPolicy, policy)
|
|
}
|
|
}
|
|
|
|
if matchedPolicy == nil {
|
|
klog.V(2).Infof("no propagationpolicy match for resource(%s)", objectKey)
|
|
return nil, nil
|
|
}
|
|
klog.V(2).Infof("Matched policy(%s/%s) for resource(%s)", matchedPolicy.Namespace, matchedPolicy.Name, objectKey)
|
|
return matchedPolicy, nil
|
|
}
|
|
|
|
// LookForMatchedClusterPolicy tries to find a ClusterPropagationPolicy for object referenced by object key.
|
|
func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey) (*policyv1alpha1.ClusterPropagationPolicy, error) {
|
|
klog.V(2).Infof("attempts to match cluster policy for resource(%s)", objectKey)
|
|
policyObjects, err := d.clusterPropagationPolicyLister.List(labels.Everything())
|
|
if err != nil {
|
|
klog.Errorf("Failed to list cluster propagation policy: %v", err)
|
|
return nil, err
|
|
}
|
|
if len(policyObjects) == 0 {
|
|
klog.V(2).Infof("no clusterpropagationpolicy find.")
|
|
return nil, nil
|
|
}
|
|
|
|
policyList := make([]*policyv1alpha1.ClusterPropagationPolicy, 0)
|
|
for index := range policyObjects {
|
|
policy, err := helper.ConvertToClusterPropagationPolicy(policyObjects[index].(*unstructured.Unstructured))
|
|
if err != nil {
|
|
klog.Errorf("Failed to convert ClusterPropagationPolicy from unstructured object: %v", err)
|
|
return nil, err
|
|
}
|
|
policyList = append(policyList, policy)
|
|
}
|
|
|
|
var matchedClusterPolicy *policyv1alpha1.ClusterPropagationPolicy
|
|
for _, policy := range policyList {
|
|
if util.ResourceMatchSelectors(object, policy.Spec.ResourceSelectors...) {
|
|
matchedClusterPolicy = GetHigherPriorityClusterPropagationPolicy(matchedClusterPolicy, policy)
|
|
}
|
|
}
|
|
|
|
if matchedClusterPolicy == nil {
|
|
klog.V(2).Infof("no propagationpolicy match for resource(%s)", objectKey)
|
|
return nil, nil
|
|
}
|
|
klog.V(2).Infof("Matched cluster policy(%s) for resource(%s)", matchedClusterPolicy.Name, objectKey)
|
|
return matchedClusterPolicy, nil
|
|
}
|
|
|
|
// ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy.
|
|
func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policy *policyv1alpha1.PropagationPolicy) (err error) {
|
|
klog.Infof("Applying policy(%s%s) for object: %s", policy.Namespace, policy.Name, objectKey)
|
|
defer func() {
|
|
if err != nil {
|
|
d.EventRecorder.Eventf(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "Apply policy(%s/%s) failed", policy.Namespace, policy.Name)
|
|
} else {
|
|
d.EventRecorder.Eventf(object, corev1.EventTypeNormal, workv1alpha2.EventReasonApplyPolicySucceed, "Apply policy(%s/%s) succeed", policy.Namespace, policy.Name)
|
|
}
|
|
}()
|
|
|
|
if err := d.ClaimPolicyForObject(object, policy.Namespace, policy.Name); err != nil {
|
|
klog.Errorf("Failed to claim policy(%s) for object: %s", policy.Name, object)
|
|
return err
|
|
}
|
|
|
|
policyLabels := map[string]string{
|
|
policyv1alpha1.PropagationPolicyNamespaceLabel: policy.GetNamespace(),
|
|
policyv1alpha1.PropagationPolicyNameLabel: policy.GetName(),
|
|
}
|
|
|
|
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
|
if err != nil {
|
|
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
|
|
return err
|
|
}
|
|
bindingCopy := binding.DeepCopy()
|
|
var operationResult controllerutil.OperationResult
|
|
err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
|
operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
|
|
// Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
|
|
bindingCopy.Labels = util.DedupeAndMergeLabels(bindingCopy.Labels, binding.Labels)
|
|
bindingCopy.OwnerReferences = binding.OwnerReferences
|
|
bindingCopy.Finalizers = binding.Finalizers
|
|
bindingCopy.Spec.Resource = binding.Spec.Resource
|
|
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
|
|
bindingCopy.Spec.Replicas = binding.Spec.Replicas
|
|
bindingCopy.Spec.PropagateDeps = binding.Spec.PropagateDeps
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("Failed to apply policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
|
|
return err
|
|
}
|
|
|
|
if operationResult == controllerutil.OperationResultCreated {
|
|
klog.Infof("Create ResourceBinding(%s/%s) successfully.", binding.GetNamespace(), binding.GetName())
|
|
} else if operationResult == controllerutil.OperationResultUpdated {
|
|
klog.Infof("Update ResourceBinding(%s/%s) successfully.", binding.GetNamespace(), binding.GetName())
|
|
} else {
|
|
klog.V(2).Infof("ResourceBinding(%s/%s) is up to date.", binding.GetNamespace(), binding.GetName())
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// ApplyClusterPolicy starts propagate the object referenced by object key according to ClusterPropagationPolicy.
|
|
func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
|
|
klog.Infof("Applying cluster policy(%s) for object: %s", policy.Name, objectKey)
|
|
defer func() {
|
|
if err != nil {
|
|
d.EventRecorder.Eventf(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "Apply cluster policy(%s) failed", policy.Name)
|
|
} else {
|
|
d.EventRecorder.Eventf(object, corev1.EventTypeNormal, workv1alpha2.EventReasonApplyPolicySucceed, "Apply policy(%s/%s) succeed", policy.Name)
|
|
}
|
|
}()
|
|
|
|
if err := d.ClaimClusterPolicyForObject(object, policy.Name); err != nil {
|
|
klog.Errorf("Failed to claim cluster policy(%s) for object: %s", policy.Name, object)
|
|
return err
|
|
}
|
|
|
|
policyLabels := map[string]string{
|
|
policyv1alpha1.ClusterPropagationPolicyLabel: policy.GetName(),
|
|
}
|
|
|
|
// Build `ResourceBinding` or `ClusterResourceBinding` according to the resource template's scope.
|
|
// For namespace-scoped resources, which namespace is not empty, building `ResourceBinding`.
|
|
// For cluster-scoped resources, which namespace is empty, building `ClusterResourceBinding`.
|
|
if object.GetNamespace() != "" {
|
|
binding, err := d.BuildResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
|
if err != nil {
|
|
klog.Errorf("Failed to build resourceBinding for object: %s. error: %v", objectKey, err)
|
|
return err
|
|
}
|
|
bindingCopy := binding.DeepCopy()
|
|
var operationResult controllerutil.OperationResult
|
|
err = retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
|
|
operationResult, err = controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
|
|
// Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
|
|
bindingCopy.Labels = binding.Labels
|
|
bindingCopy.OwnerReferences = binding.OwnerReferences
|
|
bindingCopy.Finalizers = binding.Finalizers
|
|
bindingCopy.Spec.Resource = binding.Spec.Resource
|
|
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
|
|
bindingCopy.Spec.Replicas = binding.Spec.Replicas
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return nil
|
|
})
|
|
|
|
if err != nil {
|
|
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
|
|
return err
|
|
}
|
|
|
|
if operationResult == controllerutil.OperationResultCreated {
|
|
klog.Infof("Create ResourceBinding(%s) successfully.", binding.GetName())
|
|
} else if operationResult == controllerutil.OperationResultUpdated {
|
|
klog.Infof("Update ResourceBinding(%s) successfully.", binding.GetName())
|
|
} else {
|
|
klog.V(2).Infof("ResourceBinding(%s) is up to date.", binding.GetName())
|
|
}
|
|
} else {
|
|
binding, err := d.BuildClusterResourceBinding(object, objectKey, policyLabels, policy.Spec.PropagateDeps)
|
|
if err != nil {
|
|
klog.Errorf("Failed to build clusterResourceBinding for object: %s. error: %v", objectKey, err)
|
|
return err
|
|
}
|
|
bindingCopy := binding.DeepCopy()
|
|
operationResult, err := controllerutil.CreateOrUpdate(context.TODO(), d.Client, bindingCopy, func() error {
|
|
// Just update necessary fields, especially avoid modifying Spec.Clusters which is scheduling result, if already exists.
|
|
bindingCopy.Labels = binding.Labels
|
|
bindingCopy.OwnerReferences = binding.OwnerReferences
|
|
bindingCopy.Finalizers = binding.Finalizers
|
|
bindingCopy.Spec.Resource = binding.Spec.Resource
|
|
bindingCopy.Spec.ReplicaRequirements = binding.Spec.ReplicaRequirements
|
|
bindingCopy.Spec.Replicas = binding.Spec.Replicas
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
|
|
return err
|
|
}
|
|
|
|
if operationResult == controllerutil.OperationResultCreated {
|
|
klog.Infof("Create ClusterResourceBinding(%s) successfully.", binding.GetName())
|
|
} else if operationResult == controllerutil.OperationResultUpdated {
|
|
klog.Infof("Update ClusterResourceBinding(%s) successfully.", binding.GetName())
|
|
} else {
|
|
klog.V(2).Infof("ClusterResourceBinding(%s) is up to date.", binding.GetName())
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetUnstructuredObject retrieves object by key and returned its unstructured.
|
|
func (d *ResourceDetector) GetUnstructuredObject(objectKey keys.ClusterWideKey) (*unstructured.Unstructured, error) {
|
|
objectGVR, err := restmapper.GetGroupVersionResource(d.RESTMapper, objectKey.GroupVersionKind())
|
|
if err != nil {
|
|
klog.Errorf("Failed to get GVR of object: %s, error: %v", objectKey, err)
|
|
return nil, err
|
|
}
|
|
|
|
object, err := d.InformerManager.Lister(objectGVR).Get(objectKey.NamespaceKey())
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
klog.Errorf("Failed to get object(%s), error: %v", objectKey, err)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
unstructuredObj, err := helper.ToUnstructured(object)
|
|
if err != nil {
|
|
klog.Errorf("Failed to transform object(%s), error: %v", objectKey, err)
|
|
return nil, err
|
|
}
|
|
|
|
return unstructuredObj, nil
|
|
}
|
|
|
|
// GetObject retrieves object from local cache.
|
|
func (d *ResourceDetector) GetObject(objectKey keys.ClusterWideKey) (runtime.Object, error) {
|
|
objectGVR, err := restmapper.GetGroupVersionResource(d.RESTMapper, objectKey.GroupVersionKind())
|
|
if err != nil {
|
|
klog.Errorf("Failed to get GVK of object: %s, error: %v", objectKey, err)
|
|
return nil, err
|
|
}
|
|
|
|
object, err := d.InformerManager.Lister(objectGVR).Get(objectKey.NamespaceKey())
|
|
if err != nil {
|
|
if !apierrors.IsNotFound(err) {
|
|
klog.Errorf("Failed to get object(%s), error: %v", objectKey, err)
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return object, nil
|
|
}
|
|
|
|
// ClaimPolicyForObject set policy identifier which the object associated with.
|
|
func (d *ResourceDetector) ClaimPolicyForObject(object *unstructured.Unstructured, policyNamespace string, policyName string) error {
|
|
claimedNS := util.GetLabelValue(object.GetLabels(), policyv1alpha1.PropagationPolicyNamespaceLabel)
|
|
claimedName := util.GetLabelValue(object.GetLabels(), policyv1alpha1.PropagationPolicyNameLabel)
|
|
|
|
// object has been claimed, don't need to claim again
|
|
if claimedNS == policyNamespace && claimedName == policyName {
|
|
return nil
|
|
}
|
|
|
|
util.MergeLabel(object, policyv1alpha1.PropagationPolicyNamespaceLabel, policyNamespace)
|
|
util.MergeLabel(object, policyv1alpha1.PropagationPolicyNameLabel, policyName)
|
|
|
|
return d.Client.Update(context.TODO(), object)
|
|
}
|
|
|
|
// ClaimClusterPolicyForObject set cluster identifier which the object associated with.
|
|
func (d *ResourceDetector) ClaimClusterPolicyForObject(object *unstructured.Unstructured, policyName string) error {
|
|
claimedName := util.GetLabelValue(object.GetLabels(), policyv1alpha1.ClusterPropagationPolicyLabel)
|
|
|
|
// object has been claimed, don't need to claim again
|
|
if claimedName == policyName {
|
|
return nil
|
|
}
|
|
|
|
util.MergeLabel(object, policyv1alpha1.ClusterPropagationPolicyLabel, policyName)
|
|
return d.Client.Update(context.TODO(), object)
|
|
}
|
|
|
|
// BuildResourceBinding builds a desired ResourceBinding for object.
|
|
func (d *ResourceDetector) BuildResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ResourceBinding, error) {
|
|
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
|
|
propagationBinding := &workv1alpha2.ResourceBinding{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: bindingName,
|
|
Namespace: object.GetNamespace(),
|
|
OwnerReferences: []metav1.OwnerReference{
|
|
*metav1.NewControllerRef(object, objectKey.GroupVersionKind()),
|
|
},
|
|
Labels: labels,
|
|
Finalizers: []string{util.BindingControllerFinalizer},
|
|
},
|
|
Spec: workv1alpha2.ResourceBindingSpec{
|
|
PropagateDeps: propagateDeps,
|
|
Resource: workv1alpha2.ObjectReference{
|
|
APIVersion: object.GetAPIVersion(),
|
|
Kind: object.GetKind(),
|
|
Namespace: object.GetNamespace(),
|
|
Name: object.GetName(),
|
|
UID: object.GetUID(),
|
|
ResourceVersion: object.GetResourceVersion(),
|
|
},
|
|
},
|
|
}
|
|
|
|
if d.ResourceInterpreter.HookEnabled(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretReplica) {
|
|
replicas, replicaRequirements, err := d.ResourceInterpreter.GetReplicas(object)
|
|
if err != nil {
|
|
klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err)
|
|
return nil, err
|
|
}
|
|
propagationBinding.Spec.Replicas = replicas
|
|
propagationBinding.Spec.ReplicaRequirements = replicaRequirements
|
|
}
|
|
|
|
return propagationBinding, nil
|
|
}
|
|
|
|
// BuildClusterResourceBinding builds a desired ClusterResourceBinding for object.
|
|
func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, labels map[string]string, propagateDeps bool) (*workv1alpha2.ClusterResourceBinding, error) {
|
|
bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
|
|
binding := &workv1alpha2.ClusterResourceBinding{
|
|
ObjectMeta: metav1.ObjectMeta{
|
|
Name: bindingName,
|
|
OwnerReferences: []metav1.OwnerReference{
|
|
*metav1.NewControllerRef(object, objectKey.GroupVersionKind()),
|
|
},
|
|
Labels: labels,
|
|
Finalizers: []string{util.ClusterResourceBindingControllerFinalizer},
|
|
},
|
|
Spec: workv1alpha2.ResourceBindingSpec{
|
|
PropagateDeps: propagateDeps,
|
|
Resource: workv1alpha2.ObjectReference{
|
|
APIVersion: object.GetAPIVersion(),
|
|
Kind: object.GetKind(),
|
|
Name: object.GetName(),
|
|
UID: object.GetUID(),
|
|
ResourceVersion: object.GetResourceVersion(),
|
|
},
|
|
},
|
|
}
|
|
|
|
if d.ResourceInterpreter.HookEnabled(object.GroupVersionKind(), configv1alpha1.InterpreterOperationInterpretReplica) {
|
|
replicas, replicaRequirements, err := d.ResourceInterpreter.GetReplicas(object)
|
|
if err != nil {
|
|
klog.Errorf("Failed to customize replicas for %s(%s), %v", object.GroupVersionKind(), object.GetName(), err)
|
|
return nil, err
|
|
}
|
|
binding.Spec.Replicas = replicas
|
|
binding.Spec.ReplicaRequirements = replicaRequirements
|
|
}
|
|
|
|
return binding, nil
|
|
}
|
|
|
|
// isWaiting indicates if the object is in waiting list.
|
|
func (d *ResourceDetector) isWaiting(objectKey keys.ClusterWideKey) bool {
|
|
d.waitingLock.RLock()
|
|
_, ok := d.waitingObjects[objectKey]
|
|
d.waitingLock.RUnlock()
|
|
return ok
|
|
}
|
|
|
|
// AddWaiting adds object's key to waiting list.
|
|
func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) {
|
|
d.waitingLock.Lock()
|
|
defer d.waitingLock.Unlock()
|
|
|
|
d.waitingObjects[objectKey] = struct{}{}
|
|
klog.V(1).Infof("Add object(%s) to waiting list, length of list is: %d", objectKey.String(), len(d.waitingObjects))
|
|
}
|
|
|
|
// RemoveWaiting removes object's key from waiting list.
|
|
func (d *ResourceDetector) RemoveWaiting(objectKey keys.ClusterWideKey) {
|
|
d.waitingLock.Lock()
|
|
defer d.waitingLock.Unlock()
|
|
|
|
delete(d.waitingObjects, objectKey)
|
|
}
|
|
|
|
// GetMatching gets objects keys in waiting list that matches one of resource selectors.
|
|
func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.ResourceSelector) []keys.ClusterWideKey {
|
|
d.waitingLock.RLock()
|
|
defer d.waitingLock.RUnlock()
|
|
|
|
var matchedResult []keys.ClusterWideKey
|
|
|
|
for waitKey := range d.waitingObjects {
|
|
waitObj, err := d.GetUnstructuredObject(waitKey)
|
|
if err != nil {
|
|
// all object in waiting list should exist. Just print a log to trace.
|
|
klog.Errorf("Failed to get object(%s), error: %v", waitKey.String(), err)
|
|
continue
|
|
}
|
|
|
|
for _, rs := range resourceSelectors {
|
|
if util.ResourceMatches(waitObj, rs) {
|
|
matchedResult = append(matchedResult, waitKey)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
return matchedResult
|
|
}
|
|
|
|
// OnPropagationPolicyAdd handles object add event and push the object to queue.
|
|
func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
|
|
key, err := ClusterWideKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
klog.V(2).Infof("Create PropagationPolicy(%s)", key)
|
|
d.policyReconcileWorker.Add(key)
|
|
}
|
|
|
|
// OnPropagationPolicyUpdate handles object update event and push the object to queue.
|
|
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
|
|
// currently do nothing, since a policy's resource selector can not be updated.
|
|
}
|
|
|
|
// OnPropagationPolicyDelete handles object delete event and push the object to queue.
|
|
func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
|
|
key, err := ClusterWideKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
|
|
d.policyReconcileWorker.Add(key)
|
|
}
|
|
|
|
// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
|
|
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
|
|
// put the object to queue.
|
|
// When removing a PropagationPolicy, the relevant ResourceBinding will be removed and
|
|
// the relevant objects will be put into queue again to try another policy.
|
|
func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
|
|
ckey, ok := key.(keys.ClusterWideKey)
|
|
if !ok { // should not happen
|
|
klog.Error("Found invalid key when reconciling propagation policy.")
|
|
return fmt.Errorf("invalid key")
|
|
}
|
|
|
|
unstructuredObj, err := d.propagationPolicyLister.Get(ckey.NamespaceKey())
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
klog.Infof("PropagationPolicy(%s) has been removed.", ckey.NamespaceKey())
|
|
return d.HandlePropagationPolicyDeletion(ckey.Namespace, ckey.Name)
|
|
}
|
|
klog.Errorf("Failed to get PropagationPolicy(%s): %v", ckey.NamespaceKey(), err)
|
|
return err
|
|
}
|
|
|
|
klog.Infof("PropagationPolicy(%s) has been added.", ckey.NamespaceKey())
|
|
propagationObject, err := helper.ConvertToPropagationPolicy(unstructuredObj.(*unstructured.Unstructured))
|
|
if err != nil {
|
|
klog.Errorf("Failed to convert PropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
|
|
return err
|
|
}
|
|
return d.HandlePropagationPolicyCreation(propagationObject)
|
|
}
|
|
|
|
// OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
|
|
func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
|
|
key, err := ClusterWideKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key)
|
|
d.clusterPolicyReconcileWorker.Add(key)
|
|
}
|
|
|
|
// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
|
|
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
|
|
// currently do nothing, since a policy's resource selector can not be updated.
|
|
}
|
|
|
|
// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
|
|
func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
|
|
key, err := ClusterWideKeyFunc(obj)
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
|
|
d.clusterPolicyReconcileWorker.Add(key)
|
|
}
|
|
|
|
// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
|
|
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
|
|
// put the object to queue.
|
|
// When removing a ClusterPropagationPolicy, the relevant ClusterResourceBinding will be removed and
|
|
// the relevant objects will be put into queue again to try another policy.
|
|
func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey) error {
|
|
ckey, ok := key.(keys.ClusterWideKey)
|
|
if !ok { // should not happen
|
|
klog.Error("Found invalid key when reconciling cluster propagation policy.")
|
|
return fmt.Errorf("invalid key")
|
|
}
|
|
|
|
unstructuredObj, err := d.clusterPropagationPolicyLister.Get(ckey.NamespaceKey())
|
|
if err != nil {
|
|
if apierrors.IsNotFound(err) {
|
|
klog.Infof("ClusterPropagationPolicy(%s) has been removed.", ckey.NamespaceKey())
|
|
return d.HandleClusterPropagationPolicyDeletion(ckey.Name)
|
|
}
|
|
|
|
klog.Errorf("Failed to get ClusterPropagationPolicy(%s): %v", ckey.NamespaceKey(), err)
|
|
return err
|
|
}
|
|
|
|
klog.Infof("Policy(%s) has been added", ckey.NamespaceKey())
|
|
propagationObject, err := helper.ConvertToClusterPropagationPolicy(unstructuredObj.(*unstructured.Unstructured))
|
|
if err != nil {
|
|
klog.Errorf("Failed to convert ClusterPropagationPolicy(%s) from unstructured object: %v", ckey.NamespaceKey(), err)
|
|
return err
|
|
}
|
|
return d.HandleClusterPropagationPolicyCreation(propagationObject)
|
|
}
|
|
|
|
// HandlePropagationPolicyDeletion handles PropagationPolicy delete event.
|
|
// After a policy is removed, the label marked on relevant resource template will be removed(which gives
|
|
// the resource template a change to match another policy).
|
|
//
|
|
// Note: The relevant ResourceBinding will continue to exist until the resource template is gone.
|
|
func (d *ResourceDetector) HandlePropagationPolicyDeletion(policyNS string, policyName string) error {
|
|
labelSet := labels.Set{
|
|
policyv1alpha1.PropagationPolicyNamespaceLabel: policyNS,
|
|
policyv1alpha1.PropagationPolicyNameLabel: policyName,
|
|
}
|
|
|
|
rbs, err := helper.GetResourceBindings(d.Client, labelSet)
|
|
if err != nil {
|
|
klog.Errorf("Failed to list propagation bindings: %v", err)
|
|
return err
|
|
}
|
|
|
|
for _, binding := range rbs.Items {
|
|
// Cleanup the labels from the object referencing by binding.
|
|
// In addition, this will give the object a chance to match another policy.
|
|
if err := d.CleanupLabels(binding.Spec.Resource, policyv1alpha1.PropagationPolicyNamespaceLabel, policyv1alpha1.PropagationPolicyNameLabel); err != nil {
|
|
klog.Errorf("Failed to cleanup label from resource(%s-%s/%s) when resource binding(%s/%s) removing, error: %v",
|
|
binding.Spec.Resource.Kind, binding.Spec.Resource.Namespace, binding.Spec.Resource.Name, binding.Namespace, binding.Name, err)
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// HandleClusterPropagationPolicyDeletion handles ClusterPropagationPolicy delete event.
|
|
// After a policy is removed, the label marked on relevant resource template will be removed(which gives
|
|
// the resource template a change to match another policy).
|
|
//
|
|
// Note: The relevant ClusterResourceBinding or ResourceBinding will continue to exist until the resource template is gone.
|
|
func (d *ResourceDetector) HandleClusterPropagationPolicyDeletion(policyName string) error {
|
|
var errs []error
|
|
labelSet := labels.Set{
|
|
policyv1alpha1.ClusterPropagationPolicyLabel: policyName,
|
|
}
|
|
|
|
// load the ClusterResourceBindings which labeled with current policy
|
|
crbs, err := helper.GetClusterResourceBindings(d.Client, labelSet)
|
|
if err != nil {
|
|
klog.Errorf("Failed to load cluster resource binding by policy(%s), error: %v", policyName, err)
|
|
errs = append(errs, err)
|
|
} else if len(crbs.Items) > 0 {
|
|
for _, binding := range crbs.Items {
|
|
// Cleanup the labels from the object referencing by binding.
|
|
// In addition, this will give the object a chance to match another policy.
|
|
if err := d.CleanupLabels(binding.Spec.Resource, policyv1alpha1.ClusterPropagationPolicyLabel); err != nil {
|
|
klog.Errorf("Failed to cleanup label from resource(%s-%s/%s) when cluster resource binding(%s) removing, error: %v",
|
|
binding.Spec.Resource.Kind, binding.Spec.Resource.Namespace, binding.Spec.Resource.Name, binding.Name, err)
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// load the ResourceBindings which labeled with current policy
|
|
rbs, err := helper.GetResourceBindings(d.Client, labelSet)
|
|
if err != nil {
|
|
klog.Errorf("Failed to load resource binding by policy(%s), error: %v", policyName, err)
|
|
errs = append(errs, err)
|
|
} else if len(rbs.Items) > 0 {
|
|
for _, binding := range rbs.Items {
|
|
// Cleanup the labels from the object referencing by binding.
|
|
// In addition, this will give the object a chance to match another policy.
|
|
if err := d.CleanupLabels(binding.Spec.Resource, policyv1alpha1.ClusterPropagationPolicyLabel); err != nil {
|
|
klog.Errorf("Failed to cleanup label from resource binding(%s/%s), error: %v", binding.Namespace, binding.Name, err)
|
|
errs = append(errs, err)
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(errs) > 0 {
|
|
return errors.NewAggregate(errs)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// HandlePropagationPolicyCreation handles PropagationPolicy add event.
|
|
// When a new policy arrives, should check if object in waiting list matches the policy, if yes remove the object
|
|
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
|
|
func (d *ResourceDetector) HandlePropagationPolicyCreation(policy *policyv1alpha1.PropagationPolicy) error {
|
|
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
|
|
klog.Infof("Matched %d resources by policy(%s/%s)", len(matchedKeys), policy.Namespace, policy.Name)
|
|
|
|
// check dependents only when there at least a real match.
|
|
if len(matchedKeys) > 0 {
|
|
// return err when dependents not present, that we can retry at next reconcile.
|
|
if present, err := helper.IsDependentOverridesPresent(d.Client, policy); err != nil || !present {
|
|
klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", policy.Namespace, policy.Name)
|
|
return fmt.Errorf("waiting for dependent overrides")
|
|
}
|
|
}
|
|
|
|
for _, key := range matchedKeys {
|
|
d.RemoveWaiting(key)
|
|
d.Processor.Add(key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// HandleClusterPropagationPolicyCreation handles ClusterPropagationPolicy add event.
|
|
// When a new policy arrives, should check if object in waiting list matches the policy, if yes remove the object
|
|
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
|
|
func (d *ResourceDetector) HandleClusterPropagationPolicyCreation(policy *policyv1alpha1.ClusterPropagationPolicy) error {
|
|
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
|
|
klog.Infof("Matched %d resources by policy(%s)", len(matchedKeys), policy.Name)
|
|
|
|
// check dependents only when there at least a real match.
|
|
if len(matchedKeys) > 0 {
|
|
// return err when dependents not present, that we can retry at next reconcile.
|
|
if present, err := helper.IsDependentClusterOverridesPresent(d.Client, policy); err != nil || !present {
|
|
klog.Infof("Waiting for dependent overrides present for policy(%s)", policy.Name)
|
|
return fmt.Errorf("waiting for dependent overrides")
|
|
}
|
|
}
|
|
|
|
for _, key := range matchedKeys {
|
|
d.RemoveWaiting(key)
|
|
d.Processor.Add(key)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// CleanupLabels removes labels from object referencing by objRef.
|
|
func (d *ResourceDetector) CleanupLabels(objRef workv1alpha2.ObjectReference, labels ...string) error {
|
|
workload, err := helper.FetchWorkload(d.DynamicClient, d.InformerManager, d.RESTMapper, objRef)
|
|
if err != nil {
|
|
// do nothing if resource template not exist, it might has been removed.
|
|
if apierrors.IsNotFound(err) {
|
|
return nil
|
|
}
|
|
klog.Errorf("Failed to fetch resource(kind=%s, %s/%s): %v", objRef.Kind, objRef.Namespace, objRef.Name, err)
|
|
return err
|
|
}
|
|
|
|
workloadLabels := workload.GetLabels()
|
|
for _, l := range labels {
|
|
delete(workloadLabels, l)
|
|
}
|
|
workload.SetLabels(workloadLabels)
|
|
|
|
gvr, err := restmapper.GetGroupVersionResource(d.RESTMapper, workload.GroupVersionKind())
|
|
if err != nil {
|
|
klog.Errorf("Failed to delete resource(%s/%s) labels as mapping GVK to GVR failed: %v", workload.GetNamespace(), workload.GetName(), err)
|
|
return err
|
|
}
|
|
|
|
newWorkload, err := d.DynamicClient.Resource(gvr).Namespace(workload.GetNamespace()).Update(context.TODO(), workload, metav1.UpdateOptions{})
|
|
if err != nil {
|
|
klog.Errorf("Failed to update resource %v/%v, err is %v ", workload.GetNamespace(), workload.GetName(), err)
|
|
return err
|
|
}
|
|
klog.V(2).Infof("Updated resource template(kind=%s, %s/%s) successfully", newWorkload.GetKind(), newWorkload.GetNamespace(), newWorkload.GetName())
|
|
return nil
|
|
}
|