Merge pull request #4577 from chaosi-zju/policy5
Introduced a lazy activation preference to PropagationPolicy/ClusterPropagationPolicy
This commit is contained in:
commit
ae5b3fe373
|
@ -17148,6 +17148,10 @@
|
|||
"resourceSelectors"
|
||||
],
|
||||
"properties": {
|
||||
"activationPreference": {
|
||||
"description": "ActivationPreference indicates how the referencing resource template will be propagated, in case of policy changes.\n\nIf empty, the resource template will respond to policy changes immediately, in other words, any policy changes will drive the resource template to be propagated immediately as per the current propagation rules.\n\nIf the value is 'Lazy' means the policy changes will not take effect for now but defer to the resource template changes, in other words, the resource template will not be propagated as per the current propagation rules until there is an update on it. This is an experimental feature that might help in a scenario where a policy manages huge amount of resource templates, changes to a policy typically affect numerous applications simultaneously. A minor misconfiguration could lead to widespread failures. With this feature, the change can be gradually rolled out through iterative modifications of resource templates.",
|
||||
"type": "string"
|
||||
},
|
||||
"association": {
|
||||
"description": "Association tells if relevant resources should be selected automatically. e.g. a ConfigMap referred by a Deployment. default false. Deprecated: in favor of PropagateDeps.",
|
||||
"type": "boolean"
|
||||
|
|
|
@ -44,6 +44,25 @@ spec:
|
|||
spec:
|
||||
description: Spec represents the desired behavior of ClusterPropagationPolicy.
|
||||
properties:
|
||||
activationPreference:
|
||||
description: "ActivationPreference indicates how the referencing resource
|
||||
template will be propagated, in case of policy changes. \n If empty,
|
||||
the resource template will respond to policy changes immediately,
|
||||
in other words, any policy changes will drive the resource template
|
||||
to be propagated immediately as per the current propagation rules.
|
||||
\n If the value is 'Lazy' means the policy changes will not take
|
||||
effect for now but defer to the resource template changes, in other
|
||||
words, the resource template will not be propagated as per the current
|
||||
propagation rules until there is an update on it. This is an experimental
|
||||
feature that might help in a scenario where a policy manages huge
|
||||
amount of resource templates, changes to a policy typically affect
|
||||
numerous applications simultaneously. A minor misconfiguration could
|
||||
lead to widespread failures. With this feature, the change can be
|
||||
gradually rolled out through iterative modifications of resource
|
||||
templates."
|
||||
enum:
|
||||
- Lazy
|
||||
type: string
|
||||
association:
|
||||
description: 'Association tells if relevant resources should be selected
|
||||
automatically. e.g. a ConfigMap referred by a Deployment. default
|
||||
|
|
|
@ -40,6 +40,25 @@ spec:
|
|||
spec:
|
||||
description: Spec represents the desired behavior of PropagationPolicy.
|
||||
properties:
|
||||
activationPreference:
|
||||
description: "ActivationPreference indicates how the referencing resource
|
||||
template will be propagated, in case of policy changes. \n If empty,
|
||||
the resource template will respond to policy changes immediately,
|
||||
in other words, any policy changes will drive the resource template
|
||||
to be propagated immediately as per the current propagation rules.
|
||||
\n If the value is 'Lazy' means the policy changes will not take
|
||||
effect for now but defer to the resource template changes, in other
|
||||
words, the resource template will not be propagated as per the current
|
||||
propagation rules until there is an update on it. This is an experimental
|
||||
feature that might help in a scenario where a policy manages huge
|
||||
amount of resource templates, changes to a policy typically affect
|
||||
numerous applications simultaneously. A minor misconfiguration could
|
||||
lead to widespread failures. With this feature, the change can be
|
||||
gradually rolled out through iterative modifications of resource
|
||||
templates."
|
||||
enum:
|
||||
- Lazy
|
||||
type: string
|
||||
association:
|
||||
description: 'Association tells if relevant resources should be selected
|
||||
automatically. e.g. a ConfigMap referred by a Deployment. default
|
||||
|
|
|
@ -152,6 +152,27 @@ type PropagationSpec struct {
|
|||
// +kubebuilder:validation:Enum=Abort;Overwrite
|
||||
// +optional
|
||||
ConflictResolution ConflictResolution `json:"conflictResolution,omitempty"`
|
||||
|
||||
// ActivationPreference indicates how the referencing resource template will
|
||||
// be propagated, in case of policy changes.
|
||||
//
|
||||
// If empty, the resource template will respond to policy changes
|
||||
// immediately, in other words, any policy changes will drive the resource
|
||||
// template to be propagated immediately as per the current propagation rules.
|
||||
//
|
||||
// If the value is 'Lazy' means the policy changes will not take effect for now
|
||||
// but defer to the resource template changes, in other words, the resource
|
||||
// template will not be propagated as per the current propagation rules until
|
||||
// there is an update on it.
|
||||
// This is an experimental feature that might help in a scenario where a policy
|
||||
// manages huge amount of resource templates, changes to a policy typically
|
||||
// affect numerous applications simultaneously. A minor misconfiguration
|
||||
// could lead to widespread failures. With this feature, the change can be
|
||||
// gradually rolled out through iterative modifications of resource templates.
|
||||
//
|
||||
// +kubebuilder:validation:Enum=Lazy
|
||||
// +optional
|
||||
ActivationPreference ActivationPreference `json:"activationPreference,omitempty"`
|
||||
}
|
||||
|
||||
// ResourceSelector the resources will be selected.
|
||||
|
@ -518,6 +539,16 @@ const (
|
|||
ConflictAbort ConflictResolution = "Abort"
|
||||
)
|
||||
|
||||
// ActivationPreference indicates how the referencing resource template will be propagated, in case of policy changes.
|
||||
type ActivationPreference string
|
||||
|
||||
const (
|
||||
// LazyActivation means the policy changes will not take effect for now but defer to the resource template changes,
|
||||
// in other words, the resource template will not be propagated as per the current propagation rules until
|
||||
// there is an update on it.
|
||||
LazyActivation ActivationPreference = "Lazy"
|
||||
)
|
||||
|
||||
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
|
||||
|
||||
// PropagationPolicyList contains a list of PropagationPolicy.
|
||||
|
|
|
@ -154,7 +154,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
|
|||
|
||||
detectorWorkerOptions := util.Options{
|
||||
Name: "resource detector",
|
||||
KeyFunc: ClusterWideKeyFunc,
|
||||
KeyFunc: ResourceItemKeyFunc,
|
||||
ReconcileFunc: d.Reconcile,
|
||||
RateLimiterOptions: d.RateLimiterOptions,
|
||||
}
|
||||
|
@ -226,11 +226,14 @@ func (d *ResourceDetector) NeedLeaderElection() bool {
|
|||
// Reconcile performs a full reconciliation for the object referred to by the key.
|
||||
// The key will be re-queued if an error is non-nil.
|
||||
func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
||||
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
||||
clusterWideKeyWithConfig, ok := key.(keys.ClusterWideKeyWithConfig)
|
||||
if !ok {
|
||||
klog.Error("Invalid key")
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
|
||||
clusterWideKey := clusterWideKeyWithConfig.ClusterWideKey
|
||||
resourceChangeByKarmada := clusterWideKeyWithConfig.ResourceChangeByKarmada
|
||||
klog.Infof("Reconciling object: %s", clusterWideKey)
|
||||
|
||||
object, err := d.GetUnstructuredObject(clusterWideKey)
|
||||
|
@ -243,7 +246,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
// currently we do that by setting owner reference to derived objects.
|
||||
return nil
|
||||
}
|
||||
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKey, err)
|
||||
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKeyWithConfig, err)
|
||||
return err
|
||||
}
|
||||
|
||||
|
@ -255,7 +258,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
return d.propagateResource(object, clusterWideKey)
|
||||
return d.propagateResource(object, clusterWideKey, resourceChangeByKarmada)
|
||||
}
|
||||
|
||||
// EventFilter tells if an object should be taken care of.
|
||||
|
@ -320,7 +323,7 @@ func (d *ResourceDetector) OnAdd(obj interface{}) {
|
|||
if !ok {
|
||||
return
|
||||
}
|
||||
d.Processor.Enqueue(runtimeObj)
|
||||
d.Processor.Enqueue(ResourceItem{Obj: runtimeObj})
|
||||
}
|
||||
|
||||
// OnUpdate handles object update event and push the object to queue.
|
||||
|
@ -337,12 +340,25 @@ func (d *ResourceDetector) OnUpdate(oldObj, newObj interface{}) {
|
|||
return
|
||||
}
|
||||
|
||||
newRuntimeObj, ok := newObj.(runtime.Object)
|
||||
if !ok {
|
||||
klog.Errorf("Failed to assert newObj as runtime.Object")
|
||||
return
|
||||
}
|
||||
|
||||
if !eventfilter.SpecificationChanged(unstructuredOldObj, unstructuredNewObj) {
|
||||
klog.V(4).Infof("Ignore update event of object (kind=%s, %s/%s) as specification no change", unstructuredOldObj.GetKind(), unstructuredOldObj.GetNamespace(), unstructuredOldObj.GetName())
|
||||
return
|
||||
}
|
||||
|
||||
d.OnAdd(newObj)
|
||||
resourceChangeByKarmada := eventfilter.ResourceChangeByKarmada(unstructuredOldObj, unstructuredNewObj)
|
||||
|
||||
resourceItem := ResourceItem{
|
||||
Obj: newRuntimeObj,
|
||||
ResourceChangeByKarmada: resourceChangeByKarmada,
|
||||
}
|
||||
|
||||
d.Processor.Enqueue(resourceItem)
|
||||
}
|
||||
|
||||
// OnDelete handles object delete event and push the object to queue.
|
||||
|
@ -407,7 +423,8 @@ func (d *ResourceDetector) LookForMatchedClusterPolicy(object *unstructured.Unst
|
|||
}
|
||||
|
||||
// ApplyPolicy starts propagate the object referenced by object key according to PropagationPolicy.
|
||||
func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policy *policyv1alpha1.PropagationPolicy) (err error) {
|
||||
func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
|
||||
resourceChangeByKarmada bool, policy *policyv1alpha1.PropagationPolicy) (err error) {
|
||||
start := time.Now()
|
||||
klog.Infof("Applying policy(%s/%s) for object: %s", policy.Namespace, policy.Name, objectKey)
|
||||
var operationResult controllerutil.OperationResult
|
||||
|
@ -426,6 +443,15 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
|
|||
return err
|
||||
}
|
||||
|
||||
// If this Reconcile action is triggered by Karmada itself and the current bound Policy is lazy activation preference,
|
||||
// resource will delay to sync the placement from Policy to Binding util resource is updated by User.
|
||||
if resourceChangeByKarmada && util.IsLazyActivationEnabled(policy.Spec.ActivationPreference) {
|
||||
operationResult = controllerutil.OperationResultNone
|
||||
klog.Infof("Skip refresh Binding for the change of resource (%s/%s) is from Karmada and activation "+
|
||||
"preference of current bound policy (%s) is enabled.", object.GetNamespace(), object.GetName(), policy.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
policyLabels := map[string]string{
|
||||
policyv1alpha1.PropagationPolicyNamespaceLabel: policy.GetNamespace(),
|
||||
policyv1alpha1.PropagationPolicyNameLabel: policy.GetName(),
|
||||
|
@ -497,7 +523,8 @@ func (d *ResourceDetector) ApplyPolicy(object *unstructured.Unstructured, object
|
|||
|
||||
// ApplyClusterPolicy starts propagate the object referenced by object key according to ClusterPropagationPolicy.
|
||||
// nolint:gocyclo
|
||||
func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
|
||||
func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
|
||||
resourceChangeByKarmada bool, policy *policyv1alpha1.ClusterPropagationPolicy) (err error) {
|
||||
start := time.Now()
|
||||
klog.Infof("Applying cluster policy(%s) for object: %s", policy.Name, objectKey)
|
||||
var operationResult controllerutil.OperationResult
|
||||
|
@ -516,6 +543,15 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
|
|||
return err
|
||||
}
|
||||
|
||||
// If this Reconcile action is triggered by Karmada itself and the current bound Policy is lazy activation preference,
|
||||
// resource will delay to sync the placement from Policy to Binding util resource is updated by User.
|
||||
if resourceChangeByKarmada && util.IsLazyActivationEnabled(policy.Spec.ActivationPreference) {
|
||||
operationResult = controllerutil.OperationResultNone
|
||||
klog.Infof("Skip refresh Binding for the change of resource (%s/%s) is from Karmada and activation "+
|
||||
"preference of current bound cluster policy (%s) is enabled.", object.GetNamespace(), object.GetName(), policy.Name)
|
||||
return nil
|
||||
}
|
||||
|
||||
policyLabels := map[string]string{
|
||||
policyv1alpha1.ClusterPropagationPolicyLabel: policy.GetName(),
|
||||
policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel: policyID,
|
||||
|
@ -1124,7 +1160,7 @@ func (d *ResourceDetector) ReconcileClusterPropagationPolicy(key util.QueueKey)
|
|||
}
|
||||
|
||||
// HandlePropagationPolicyDeletion handles PropagationPolicy delete event.
|
||||
// After a policy is removed, the label marked on relevant resource template will be removed(which gives
|
||||
// After a policy is removed, the label marked on relevant resource template will be removed (which gives
|
||||
// the resource template a change to match another policy).
|
||||
//
|
||||
// Note: The relevant ResourceBinding will continue to exist until the resource template is gone.
|
||||
|
@ -1161,7 +1197,7 @@ func (d *ResourceDetector) HandlePropagationPolicyDeletion(policyNS string, poli
|
|||
}
|
||||
|
||||
// HandleClusterPropagationPolicyDeletion handles ClusterPropagationPolicy delete event.
|
||||
// After a policy is removed, the label marked on relevant resource template will be removed(which gives
|
||||
// After a policy is removed, the label marked on relevant resource template will be removed (which gives
|
||||
// the resource template a change to match another policy).
|
||||
//
|
||||
// Note: The relevant ClusterResourceBinding or ResourceBinding will continue to exist until the resource template is gone.
|
||||
|
@ -1234,6 +1270,8 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyDeletion(policyName str
|
|||
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
|
||||
// Finally, handle the propagation policy preemption process if preemption is enabled.
|
||||
func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *policyv1alpha1.PropagationPolicy) error {
|
||||
// If the Policy's ResourceSelectors change, causing certain resources to no longer match the Policy, the label marked
|
||||
// on relevant resource template will be removed (which gives the resource template a change to match another policy).
|
||||
err := d.cleanPPUnmatchedResourceBindings(policy.Namespace, policy.Name, policy.Spec.ResourceSelectors)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1251,9 +1289,10 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Processor.Add(resourceKey)
|
||||
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
|
||||
}
|
||||
|
||||
// check whether there are matched RT in waiting list, is so, add it to processor
|
||||
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
|
||||
klog.Infof("Matched %d resources by policy(%s/%s)", len(matchedKeys), policy.Namespace, policy.Name)
|
||||
|
||||
|
@ -1268,10 +1307,12 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
|
|||
|
||||
for _, key := range matchedKeys {
|
||||
d.RemoveWaiting(key)
|
||||
d.Processor.Add(key)
|
||||
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
|
||||
}
|
||||
|
||||
// if preemption is enabled, handle the preemption process.
|
||||
// If preemption is enabled, handle the preemption process.
|
||||
// If this policy succeeds in preempting resource managed by other policy, the label marked on relevant resource
|
||||
// will be replaced, which gives the resource template a change to match to this policy.
|
||||
if preemptionEnabled(policy.Spec.Preemption) {
|
||||
return d.handlePropagationPolicyPreemption(policy)
|
||||
}
|
||||
|
@ -1286,6 +1327,8 @@ func (d *ResourceDetector) HandlePropagationPolicyCreationOrUpdate(policy *polic
|
|||
// from waiting list and throw the object to it's reconcile queue. If not, do nothing.
|
||||
// Finally, handle the cluster propagation policy preemption process if preemption is enabled.
|
||||
func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy *policyv1alpha1.ClusterPropagationPolicy) error {
|
||||
// If the Policy's ResourceSelectors change, causing certain resources to no longer match the Policy, the label marked
|
||||
// on relevant resource template will be removed (which gives the resource template a change to match another policy).
|
||||
err := d.cleanCPPUnmatchedResourceBindings(policy.Name, policy.Spec.ResourceSelectors)
|
||||
if err != nil {
|
||||
return err
|
||||
|
@ -1312,14 +1355,14 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
|
|||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Processor.Add(resourceKey)
|
||||
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
|
||||
}
|
||||
for _, crb := range clusterResourceBindings.Items {
|
||||
resourceKey, err := helper.ConstructClusterWideKey(crb.Spec.Resource)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
d.Processor.Add(resourceKey)
|
||||
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: resourceKey, ResourceChangeByKarmada: true})
|
||||
}
|
||||
|
||||
matchedKeys := d.GetMatching(policy.Spec.ResourceSelectors)
|
||||
|
@ -1336,10 +1379,12 @@ func (d *ResourceDetector) HandleClusterPropagationPolicyCreationOrUpdate(policy
|
|||
|
||||
for _, key := range matchedKeys {
|
||||
d.RemoveWaiting(key)
|
||||
d.Processor.Add(key)
|
||||
d.Processor.Add(keys.ClusterWideKeyWithConfig{ClusterWideKey: key, ResourceChangeByKarmada: true})
|
||||
}
|
||||
|
||||
// if preemption is enabled, handle the preemption process.
|
||||
// If preemption is enabled, handle the preemption process.
|
||||
// If this policy succeeds in preempting resource managed by other policy, the label marked on relevant resource
|
||||
// will be replaced, which gives the resource template a change to match to this policy.
|
||||
if preemptionEnabled(policy.Spec.Preemption) {
|
||||
return d.handleClusterPropagationPolicyPreemption(policy)
|
||||
}
|
||||
|
|
|
@ -17,6 +17,10 @@ limitations under the License.
|
|||
package detector
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
|
||||
"github.com/karmada-io/karmada/pkg/util"
|
||||
"github.com/karmada-io/karmada/pkg/util/fedinformer/keys"
|
||||
)
|
||||
|
@ -25,3 +29,33 @@ import (
|
|||
func ClusterWideKeyFunc(obj interface{}) (util.QueueKey, error) {
|
||||
return keys.ClusterWideKeyFunc(obj)
|
||||
}
|
||||
|
||||
const (
|
||||
// ObjectChangedByKarmada the key name for a bool value which describes whether the object is changed by Karmada
|
||||
ObjectChangedByKarmada = "ObjectChangedByKarmada"
|
||||
)
|
||||
|
||||
// ResourceItem a object key with certain extended config
|
||||
type ResourceItem struct {
|
||||
Obj runtime.Object
|
||||
ResourceChangeByKarmada bool
|
||||
}
|
||||
|
||||
// ResourceItemKeyFunc generates a ClusterWideKeyWithConfig for object.
|
||||
func ResourceItemKeyFunc(obj interface{}) (util.QueueKey, error) {
|
||||
var err error
|
||||
key := keys.ClusterWideKeyWithConfig{}
|
||||
|
||||
resourceItem, ok := obj.(ResourceItem)
|
||||
if !ok {
|
||||
return key, fmt.Errorf("failed to assert object as ResourceItem")
|
||||
}
|
||||
|
||||
key.ResourceChangeByKarmada = resourceItem.ResourceChangeByKarmada
|
||||
key.ClusterWideKey, err = keys.ClusterWideKeyFunc(resourceItem.Obj)
|
||||
if err != nil {
|
||||
return key, err
|
||||
}
|
||||
|
||||
return key, nil
|
||||
}
|
||||
|
|
|
@ -36,21 +36,22 @@ import (
|
|||
"github.com/karmada-io/karmada/pkg/util/helper"
|
||||
)
|
||||
|
||||
func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured, objectKey keys.ClusterWideKey) error {
|
||||
func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,
|
||||
objectKey keys.ClusterWideKey, resourceChangeByKarmada bool) error {
|
||||
// 1. Check if the object has been claimed by a PropagationPolicy,
|
||||
// if so, just apply it.
|
||||
policyLabels := object.GetLabels()
|
||||
claimedNamespace := util.GetLabelValue(policyLabels, policyv1alpha1.PropagationPolicyNamespaceLabel)
|
||||
claimedName := util.GetLabelValue(policyLabels, policyv1alpha1.PropagationPolicyNameLabel)
|
||||
if claimedNamespace != "" && claimedName != "" {
|
||||
return d.getAndApplyPolicy(object, objectKey, claimedNamespace, claimedName)
|
||||
return d.getAndApplyPolicy(object, objectKey, resourceChangeByKarmada, claimedNamespace, claimedName)
|
||||
}
|
||||
|
||||
// 2. Check if the object has been claimed by a ClusterPropagationPolicy,
|
||||
// if so, just apply it.
|
||||
claimedName = util.GetLabelValue(policyLabels, policyv1alpha1.ClusterPropagationPolicyLabel)
|
||||
if claimedName != "" {
|
||||
return d.getAndApplyClusterPolicy(object, objectKey, claimedName)
|
||||
return d.getAndApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, claimedName)
|
||||
}
|
||||
|
||||
// 3. attempt to match policy in its namespace.
|
||||
|
@ -68,7 +69,7 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,
|
|||
}
|
||||
d.RemoveWaiting(objectKey)
|
||||
metrics.ObserveFindMatchedPolicyLatency(start)
|
||||
return d.ApplyPolicy(object, objectKey, propagationPolicy)
|
||||
return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, propagationPolicy)
|
||||
}
|
||||
|
||||
// 4. reaching here means there is no appropriate PropagationPolicy, attempt to match a ClusterPropagationPolicy.
|
||||
|
@ -85,7 +86,7 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,
|
|||
}
|
||||
d.RemoveWaiting(objectKey)
|
||||
metrics.ObserveFindMatchedPolicyLatency(start)
|
||||
return d.ApplyClusterPolicy(object, objectKey, clusterPolicy)
|
||||
return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, clusterPolicy)
|
||||
}
|
||||
|
||||
if d.isWaiting(objectKey) {
|
||||
|
@ -100,7 +101,8 @@ func (d *ResourceDetector) propagateResource(object *unstructured.Unstructured,
|
|||
return fmt.Errorf("no matched propagation policy")
|
||||
}
|
||||
|
||||
func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policyNamespace, policyName string) error {
|
||||
func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
|
||||
resourceChangeByKarmada bool, policyNamespace, policyName string) error {
|
||||
policyObject, err := d.propagationPolicyLister.ByNamespace(policyNamespace).Get(policyName)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
@ -134,10 +136,11 @@ func (d *ResourceDetector) getAndApplyPolicy(object *unstructured.Unstructured,
|
|||
return fmt.Errorf("waiting for dependent overrides")
|
||||
}
|
||||
|
||||
return d.ApplyPolicy(object, objectKey, matchedPropagationPolicy)
|
||||
return d.ApplyPolicy(object, objectKey, resourceChangeByKarmada, matchedPropagationPolicy)
|
||||
}
|
||||
|
||||
func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey, policyName string) error {
|
||||
func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstructured, objectKey keys.ClusterWideKey,
|
||||
resourceChangeByKarmada bool, policyName string) error {
|
||||
policyObject, err := d.clusterPropagationPolicyLister.Get(policyName)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
@ -172,7 +175,7 @@ func (d *ResourceDetector) getAndApplyClusterPolicy(object *unstructured.Unstruc
|
|||
return fmt.Errorf("waiting for dependent overrides")
|
||||
}
|
||||
|
||||
return d.ApplyClusterPolicy(object, objectKey, matchedClusterPropagationPolicy)
|
||||
return d.ApplyClusterPolicy(object, objectKey, resourceChangeByKarmada, matchedClusterPropagationPolicy)
|
||||
}
|
||||
|
||||
func (d *ResourceDetector) cleanPPUnmatchedResourceBindings(policyNamespace, policyName string, selectors []policyv1alpha1.ResourceSelector) error {
|
||||
|
|
|
@ -4474,6 +4474,13 @@ func schema_pkg_apis_policy_v1alpha1_PropagationSpec(ref common.ReferenceCallbac
|
|||
Format: "",
|
||||
},
|
||||
},
|
||||
"activationPreference": {
|
||||
SchemaProps: spec.SchemaProps{
|
||||
Description: "ActivationPreference indicates how the referencing resource template will be propagated, in case of policy changes.\n\nIf empty, the resource template will respond to policy changes immediately, in other words, any policy changes will drive the resource template to be propagated immediately as per the current propagation rules.\n\nIf the value is 'Lazy' means the policy changes will not take effect for now but defer to the resource template changes, in other words, the resource template will not be propagated as per the current propagation rules until there is an update on it. This is an experimental feature that might help in a scenario where a policy manages huge amount of resource templates, changes to a policy typically affect numerous applications simultaneously. A minor misconfiguration could lead to widespread failures. With this feature, the change can be gradually rolled out through iterative modifications of resource templates.",
|
||||
Type: []string{"string"},
|
||||
Format: "",
|
||||
},
|
||||
},
|
||||
},
|
||||
Required: []string{"resourceSelectors"},
|
||||
},
|
||||
|
|
|
@ -18,22 +18,87 @@ package eventfilter
|
|||
|
||||
import (
|
||||
"reflect"
|
||||
"strings"
|
||||
|
||||
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
|
||||
|
||||
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
)
|
||||
|
||||
// labelOrAnnoKeyPrefixByKarmada defines the key prefix used for labels or annotations used by karmada's own components.
|
||||
const labelOrAnnoKeyPrefixByKarmada = ".karmada.io"
|
||||
|
||||
// labelsForUserWithKarmadaPrefix enumerates special cases that labels use the karmada prefix but are really only for users to use.
|
||||
var labelsForUserWithKarmadaPrefix = map[string]struct{}{
|
||||
policyv1alpha1.NamespaceSkipAutoPropagationLabel: {},
|
||||
}
|
||||
|
||||
// SpecificationChanged check if the specification of the given object change or not
|
||||
func SpecificationChanged(oldObj, newObj *unstructured.Unstructured) bool {
|
||||
oldBackup := oldObj.DeepCopy()
|
||||
newBackup := newObj.DeepCopy()
|
||||
|
||||
// Remove the status and some system defined mutable fields in metadata, including managedFields and resourceVersion.
|
||||
// Refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta for more details.
|
||||
removeIgnoredFields(oldBackup, newBackup)
|
||||
|
||||
return !reflect.DeepEqual(oldBackup, newBackup)
|
||||
}
|
||||
|
||||
// ResourceChangeByKarmada check if the change of the object is modified by Karmada.
|
||||
// If oldObj deep equal to newObj after removing fields changed by Karmada, such change refers to ChangeByKarmada.
|
||||
func ResourceChangeByKarmada(oldObj, newObj *unstructured.Unstructured) bool {
|
||||
oldBackup := oldObj.DeepCopy()
|
||||
newBackup := newObj.DeepCopy()
|
||||
|
||||
removeIgnoredFields(oldBackup, newBackup)
|
||||
|
||||
removeFieldsChangedByKarmada(oldBackup, newBackup)
|
||||
|
||||
return reflect.DeepEqual(oldBackup, newBackup)
|
||||
}
|
||||
|
||||
// removeIgnoredFields Remove the status and some system defined mutable fields in metadata, including managedFields and resourceVersion.
|
||||
// Refer to https://kubernetes.io/docs/reference/kubernetes-api/common-definitions/object-meta/#ObjectMeta for more details.
|
||||
func removeIgnoredFields(oldBackup, newBackup *unstructured.Unstructured) {
|
||||
removedFields := [][]string{{"status"}, {"metadata", "managedFields"}, {"metadata", "resourceVersion"}}
|
||||
for _, r := range removedFields {
|
||||
unstructured.RemoveNestedField(oldBackup.Object, r...)
|
||||
unstructured.RemoveNestedField(newBackup.Object, r...)
|
||||
}
|
||||
|
||||
return !reflect.DeepEqual(oldBackup, newBackup)
|
||||
}
|
||||
|
||||
// removeFieldsChangedByKarmada Remove the fields modified by karmada's own components.
|
||||
func removeFieldsChangedByKarmada(oldBackup, newBackup *unstructured.Unstructured) {
|
||||
// Remove label or annotations changed by Karmada
|
||||
removeLabelOrAnnotationByKarmada(oldBackup)
|
||||
removeLabelOrAnnotationByKarmada(newBackup)
|
||||
|
||||
// Karmada's change may result in some auto-generated fields being modified indirectly, e.g: metadata.generation
|
||||
// They should also be removed before comparing.
|
||||
removedFields := [][]string{{"metadata", "generation"}}
|
||||
for _, r := range removedFields {
|
||||
unstructured.RemoveNestedField(oldBackup.Object, r...)
|
||||
unstructured.RemoveNestedField(newBackup.Object, r...)
|
||||
}
|
||||
}
|
||||
|
||||
// removeLabelOrAnnotationByKarmada remove the label or annotation modified by karmada's own components.
|
||||
func removeLabelOrAnnotationByKarmada(unstructuredObj *unstructured.Unstructured) {
|
||||
for k := range unstructuredObj.GetLabels() {
|
||||
_, isLabelForUser := labelsForUserWithKarmadaPrefix[k]
|
||||
if strings.Contains(k, labelOrAnnoKeyPrefixByKarmada) && !isLabelForUser {
|
||||
unstructured.RemoveNestedField(unstructuredObj.Object, []string{"metadata", "labels", k}...)
|
||||
}
|
||||
}
|
||||
if len(unstructuredObj.GetLabels()) == 0 {
|
||||
unstructured.RemoveNestedField(unstructuredObj.Object, []string{"metadata", "labels"}...)
|
||||
}
|
||||
|
||||
for k := range unstructuredObj.GetAnnotations() {
|
||||
if strings.Contains(k, labelOrAnnoKeyPrefixByKarmada) {
|
||||
unstructured.RemoveNestedField(unstructuredObj.Object, []string{"metadata", "annotations", k}...)
|
||||
}
|
||||
}
|
||||
if len(unstructuredObj.GetAnnotations()) == 0 {
|
||||
unstructured.RemoveNestedField(unstructuredObj.Object, []string{"metadata", "annotations"}...)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -24,7 +24,7 @@ import (
|
|||
)
|
||||
|
||||
// NewHandlerOnAllEvents builds a ResourceEventHandler that the function 'fn' will be called on all events(add/update/delete).
|
||||
func NewHandlerOnAllEvents(fn func(runtime.Object)) cache.ResourceEventHandler {
|
||||
func NewHandlerOnAllEvents(fn func(interface{})) cache.ResourceEventHandler {
|
||||
return &cache.ResourceEventHandlerFuncs{
|
||||
AddFunc: func(cur interface{}) {
|
||||
curObj := cur.(runtime.Object)
|
||||
|
|
|
@ -111,6 +111,21 @@ func ClusterWideKeyFunc(obj interface{}) (ClusterWideKey, error) {
|
|||
return key, nil
|
||||
}
|
||||
|
||||
// ClusterWideKeyWithConfig is the object key which is a unique identifier under a cluster, combined with certain config.
|
||||
type ClusterWideKeyWithConfig struct {
|
||||
// ClusterWideKey is the object key which is a unique identifier under a cluster, across all resources.
|
||||
ClusterWideKey ClusterWideKey
|
||||
|
||||
// ResourceChangeByKarmada defines whether resource is changed by Karmada
|
||||
ResourceChangeByKarmada bool
|
||||
}
|
||||
|
||||
// String returns the key's printable info with format:
|
||||
// "<GroupVersion>, kind=<Kind>, <NamespaceKey>, ResourceChangeByKarmada=<ResourceChangeByKarmada>"
|
||||
func (k ClusterWideKeyWithConfig) String() string {
|
||||
return fmt.Sprintf("%s, ResourceChangeByKarmada=%v", k.ClusterWideKey.String(), k.ResourceChangeByKarmada)
|
||||
}
|
||||
|
||||
// FederatedKey is the object key which is a unique identifier across all clusters in federation.
|
||||
type FederatedKey struct {
|
||||
// Cluster is the cluster name of the referencing object.
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
/*
|
||||
Copyright 2024 The Karmada Authors.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package util
|
||||
|
||||
import policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
|
||||
|
||||
// IsLazyActivationEnabled judge whether lazy activation preference is enabled.
|
||||
func IsLazyActivationEnabled(activationPreference policyv1alpha1.ActivationPreference) bool {
|
||||
if activationPreference == "" {
|
||||
return false
|
||||
}
|
||||
return activationPreference == policyv1alpha1.LazyActivation
|
||||
}
|
|
@ -19,7 +19,6 @@ package util
|
|||
import (
|
||||
"time"
|
||||
|
||||
"k8s.io/apimachinery/pkg/runtime"
|
||||
"k8s.io/apimachinery/pkg/util/wait"
|
||||
"k8s.io/client-go/tools/cache"
|
||||
"k8s.io/client-go/util/workqueue"
|
||||
|
@ -39,7 +38,7 @@ type AsyncWorker interface {
|
|||
AddAfter(item interface{}, duration time.Duration)
|
||||
|
||||
// Enqueue generates the key of 'obj' according to a 'KeyFunc' then adds the key as an item to queue by 'Add'.
|
||||
Enqueue(obj runtime.Object)
|
||||
Enqueue(obj interface{})
|
||||
|
||||
// Run starts a certain number of concurrent workers to reconcile the items and will never stop until 'stopChan'
|
||||
// is closed.
|
||||
|
@ -90,10 +89,10 @@ func NewAsyncWorker(opt Options) AsyncWorker {
|
|||
}
|
||||
}
|
||||
|
||||
func (w *asyncWorker) Enqueue(obj runtime.Object) {
|
||||
func (w *asyncWorker) Enqueue(obj interface{}) {
|
||||
key, err := w.keyFunc(obj)
|
||||
if err != nil {
|
||||
klog.Warningf("Failed to generate key for obj: %s", obj.GetObjectKind().GroupVersionKind())
|
||||
klog.Warningf("Failed to generate key for obj: %+v", obj)
|
||||
return
|
||||
}
|
||||
|
||||
|
|
Loading…
Reference in New Issue