Merge pull request #1368 from dddddai/detector
Rework "bugfix: resource binding not created occasionally"
This commit is contained in:
commit
1860fb2af9
|
@ -55,8 +55,14 @@ type ResourceDetector struct {
|
|||
// ResourceInterpreter knows the details of resource structure.
|
||||
ResourceInterpreter resourceinterpreter.ResourceInterpreter
|
||||
EventRecorder record.EventRecorder
|
||||
// policyReconcileWorker maintains a rate limited queue which used to store PropagationPolicy's key and
|
||||
// a reconcile function to consume the items in queue.
|
||||
policyReconcileWorker util.AsyncWorker
|
||||
propagationPolicyLister cache.GenericLister
|
||||
|
||||
propagationPolicyLister cache.GenericLister
|
||||
// clusterPolicyReconcileWorker maintains a rate limited queue which used to store ClusterPropagationPolicy's key and
|
||||
// a reconcile function to consume the items in queue.
|
||||
clusterPolicyReconcileWorker util.AsyncWorker
|
||||
clusterPropagationPolicyLister cache.GenericLister
|
||||
|
||||
// bindingReconcileWorker maintains a rate limited queue which used to store ResourceBinding's key and
|
||||
|
@ -80,17 +86,30 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
|
|||
d.waitingObjects = make(map[keys.ClusterWideKey]struct{})
|
||||
d.stopCh = ctx.Done()
|
||||
|
||||
// setup policy reconcile worker
|
||||
d.policyReconcileWorker = util.NewAsyncWorker("propagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcilePropagationPolicy)
|
||||
d.policyReconcileWorker.Run(1, d.stopCh)
|
||||
d.clusterPolicyReconcileWorker = util.NewAsyncWorker("clusterPropagationPolicy reconciler", ClusterWideKeyFunc, d.ReconcileClusterPropagationPolicy)
|
||||
d.clusterPolicyReconcileWorker.Run(1, d.stopCh)
|
||||
|
||||
// watch and enqueue PropagationPolicy changes.
|
||||
propagationPolicyGVR := schema.GroupVersionResource{
|
||||
Group: policyv1alpha1.GroupVersion.Group,
|
||||
Version: policyv1alpha1.GroupVersion.Version,
|
||||
Resource: "propagationpolicies",
|
||||
}
|
||||
policyHandler := informermanager.NewHandlerOnEvents(d.OnPropagationPolicyAdd, d.OnPropagationPolicyUpdate, d.OnPropagationPolicyDelete)
|
||||
d.InformerManager.ForResource(propagationPolicyGVR, policyHandler)
|
||||
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
|
||||
|
||||
// watch and enqueue ClusterPropagationPolicy changes.
|
||||
clusterPropagationPolicyGVR := schema.GroupVersionResource{
|
||||
Group: policyv1alpha1.GroupVersion.Group,
|
||||
Version: policyv1alpha1.GroupVersion.Version,
|
||||
Resource: "clusterpropagationpolicies",
|
||||
}
|
||||
d.propagationPolicyLister = d.InformerManager.Lister(propagationPolicyGVR)
|
||||
clusterPolicyHandler := informermanager.NewHandlerOnEvents(d.OnClusterPropagationPolicyAdd, d.OnClusterPropagationPolicyUpdate, d.OnClusterPropagationPolicyDelete)
|
||||
d.InformerManager.ForResource(clusterPropagationPolicyGVR, clusterPolicyHandler)
|
||||
d.clusterPropagationPolicyLister = d.InformerManager.Lister(clusterPropagationPolicyGVR)
|
||||
|
||||
// setup binding reconcile worker
|
||||
|
@ -116,9 +135,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
|
|||
clusterBindingHandler := informermanager.NewHandlerOnEvents(d.OnClusterResourceBindingAdd, d.OnClusterResourceBindingUpdate, nil)
|
||||
d.InformerManager.ForResource(clusterResourceBindingGVR, clusterBindingHandler)
|
||||
|
||||
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
|
||||
d.InformerManager.ForResource(propagationPolicyGVR, d.EventHandler)
|
||||
d.InformerManager.ForResource(clusterPropagationPolicyGVR, d.EventHandler)
|
||||
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
|
||||
d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile)
|
||||
d.Processor.Run(1, d.stopCh)
|
||||
go d.discoverResources(30 * time.Second)
|
||||
|
@ -190,20 +207,8 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
klog.Error("invalid key")
|
||||
return fmt.Errorf("invalid key")
|
||||
}
|
||||
|
||||
if d.SkippedFromPropagating(clusterWideKey) {
|
||||
if clusterWideKey.Group == policyv1alpha1.GroupName {
|
||||
switch clusterWideKey.Kind {
|
||||
case "PropagationPolicy":
|
||||
return d.ReconcilePropagationPolicy(key)
|
||||
case "ClusterPropagationPolicy":
|
||||
return d.ReconcileClusterPropagationPolicy(key)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
klog.Infof("Reconciling object: %s", clusterWideKey)
|
||||
|
||||
object, err := d.GetUnstructuredObject(clusterWideKey)
|
||||
if err != nil {
|
||||
if apierrors.IsNotFound(err) {
|
||||
|
@ -230,7 +235,7 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
klog.Infof("Waiting for dependent overrides present for policy(%s/%s)", propagationPolicy.Namespace, propagationPolicy.Name)
|
||||
return fmt.Errorf("waiting for dependent overrides")
|
||||
}
|
||||
|
||||
d.RemoveWaiting(clusterWideKey)
|
||||
return d.ApplyPolicy(object, clusterWideKey, propagationPolicy)
|
||||
}
|
||||
|
||||
|
@ -241,17 +246,23 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
return err
|
||||
}
|
||||
if clusterPolicy != nil {
|
||||
d.RemoveWaiting(clusterWideKey)
|
||||
return d.ApplyClusterPolicy(object, clusterWideKey, clusterPolicy)
|
||||
}
|
||||
|
||||
d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource")
|
||||
// reaching here mean there is no appropriate policy for the object, put it into waiting list.
|
||||
d.AddWaiting(clusterWideKey)
|
||||
if d.isWaiting(clusterWideKey) {
|
||||
// reaching here means there is no appropriate policy for the object
|
||||
d.EventRecorder.Event(object, corev1.EventTypeWarning, workv1alpha2.EventReasonApplyPolicyFailed, "No policy match for resource")
|
||||
return nil
|
||||
}
|
||||
|
||||
return nil
|
||||
// put it into waiting list and retry once in case the resource and propagation policy come at the same time
|
||||
// see https://github.com/karmada-io/karmada/issues/1195
|
||||
d.AddWaiting(clusterWideKey)
|
||||
return fmt.Errorf("no matched propagation policy")
|
||||
}
|
||||
|
||||
// SkippedFromPropagating tells if an object should be propagated.
|
||||
// EventFilter tells if an object should be take care of.
|
||||
//
|
||||
// All objects under Kubernetes reserved namespace should be ignored:
|
||||
// - kube-system
|
||||
|
@ -264,8 +275,6 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
// All objects which API group defined by Karmada should be ignored:
|
||||
// - cluster.karmada.io
|
||||
// - policy.karmada.io
|
||||
// - work.karmada.io
|
||||
// - config.karmada.io
|
||||
//
|
||||
// The api objects listed above will be ignored by default, as we don't want users to manually input the things
|
||||
// they don't care when trying to skip something else.
|
||||
|
@ -274,32 +283,42 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
|
|||
// the specified apis will be ignored as well.
|
||||
//
|
||||
// If '--skipped-propagating-namespaces' is specified, all APIs in the skipped-propagating-namespaces will be ignored.
|
||||
func (d *ResourceDetector) SkippedFromPropagating(clusterWideKey keys.ClusterWideKey) bool {
|
||||
func (d *ResourceDetector) EventFilter(obj interface{}) bool {
|
||||
key, err := ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
|
||||
clusterWideKey, ok := key.(keys.ClusterWideKey)
|
||||
if !ok {
|
||||
klog.Errorf("Invalid key")
|
||||
return false
|
||||
}
|
||||
|
||||
if names.IsReservedNamespace(clusterWideKey.Namespace) {
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
if d.SkippedResourceConfig != nil {
|
||||
if d.SkippedResourceConfig.GroupDisabled(clusterWideKey.Group) {
|
||||
klog.V(4).Infof("Skip propagating %s", clusterWideKey.Group)
|
||||
return true
|
||||
klog.V(4).Infof("Skip event for %s", clusterWideKey.Group)
|
||||
return false
|
||||
}
|
||||
if d.SkippedResourceConfig.GroupVersionDisabled(clusterWideKey.GroupVersion()) {
|
||||
klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersion())
|
||||
return true
|
||||
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersion())
|
||||
return false
|
||||
}
|
||||
if d.SkippedResourceConfig.GroupVersionKindDisabled(clusterWideKey.GroupVersionKind()) {
|
||||
klog.V(4).Infof("Skip propagating %s", clusterWideKey.GroupVersionKind())
|
||||
return true
|
||||
klog.V(4).Infof("Skip event for %s", clusterWideKey.GroupVersionKind())
|
||||
return false
|
||||
}
|
||||
}
|
||||
// if SkippedPropagatingNamespaces is set, skip object events in these namespaces.
|
||||
if _, ok := d.SkippedPropagatingNamespaces[clusterWideKey.Namespace]; ok {
|
||||
klog.V(4).Infof("Skip propagating resources in %s", clusterWideKey.Namespace)
|
||||
return true
|
||||
return false
|
||||
}
|
||||
|
||||
return false
|
||||
return true
|
||||
}
|
||||
|
||||
// OnAdd handles object add event and push the object to queue.
|
||||
|
@ -508,6 +527,7 @@ func (d *ResourceDetector) ApplyClusterPolicy(object *unstructured.Unstructured,
|
|||
}
|
||||
return nil
|
||||
})
|
||||
|
||||
if err != nil {
|
||||
klog.Errorf("Failed to apply cluster policy(%s) for object: %s. error: %v", policy.Name, objectKey, err)
|
||||
return err
|
||||
|
@ -701,6 +721,14 @@ func (d *ResourceDetector) BuildClusterResourceBinding(object *unstructured.Unst
|
|||
return binding, nil
|
||||
}
|
||||
|
||||
// isWaiting indicates if the object is in waiting list.
|
||||
func (d *ResourceDetector) isWaiting(objectKey keys.ClusterWideKey) bool {
|
||||
d.waitingLock.RLock()
|
||||
_, ok := d.waitingObjects[objectKey]
|
||||
d.waitingLock.RUnlock()
|
||||
return ok
|
||||
}
|
||||
|
||||
// AddWaiting adds object's key to waiting list.
|
||||
func (d *ResourceDetector) AddWaiting(objectKey keys.ClusterWideKey) {
|
||||
d.waitingLock.Lock()
|
||||
|
@ -744,6 +772,33 @@ func (d *ResourceDetector) GetMatching(resourceSelectors []policyv1alpha1.Resour
|
|||
return matchedResult
|
||||
}
|
||||
|
||||
// OnPropagationPolicyAdd handles object add event and push the object to queue.
|
||||
func (d *ResourceDetector) OnPropagationPolicyAdd(obj interface{}) {
|
||||
key, err := ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Create PropagationPolicy(%s)", key)
|
||||
d.policyReconcileWorker.Add(key)
|
||||
}
|
||||
|
||||
// OnPropagationPolicyUpdate handles object update event and push the object to queue.
|
||||
func (d *ResourceDetector) OnPropagationPolicyUpdate(oldObj, newObj interface{}) {
|
||||
// currently do nothing, since a policy's resource selector can not be updated.
|
||||
}
|
||||
|
||||
// OnPropagationPolicyDelete handles object delete event and push the object to queue.
|
||||
func (d *ResourceDetector) OnPropagationPolicyDelete(obj interface{}) {
|
||||
key, err := ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Delete PropagationPolicy(%s)", key)
|
||||
d.policyReconcileWorker.Add(key)
|
||||
}
|
||||
|
||||
// ReconcilePropagationPolicy handles PropagationPolicy resource changes.
|
||||
// When adding a PropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
|
||||
// put the object to queue.
|
||||
|
@ -775,6 +830,33 @@ func (d *ResourceDetector) ReconcilePropagationPolicy(key util.QueueKey) error {
|
|||
return d.HandlePropagationPolicyCreation(propagationObject)
|
||||
}
|
||||
|
||||
// OnClusterPropagationPolicyAdd handles object add event and push the object to queue.
|
||||
func (d *ResourceDetector) OnClusterPropagationPolicyAdd(obj interface{}) {
|
||||
key, err := ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Create ClusterPropagationPolicy(%s)", key)
|
||||
d.clusterPolicyReconcileWorker.Add(key)
|
||||
}
|
||||
|
||||
// OnClusterPropagationPolicyUpdate handles object update event and push the object to queue.
|
||||
func (d *ResourceDetector) OnClusterPropagationPolicyUpdate(oldObj, newObj interface{}) {
|
||||
// currently do nothing, since a policy's resource selector can not be updated.
|
||||
}
|
||||
|
||||
// OnClusterPropagationPolicyDelete handles object delete event and push the object to queue.
|
||||
func (d *ResourceDetector) OnClusterPropagationPolicyDelete(obj interface{}) {
|
||||
key, err := ClusterWideKeyFunc(obj)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
klog.V(2).Infof("Delete ClusterPropagationPolicy(%s)", key)
|
||||
d.clusterPolicyReconcileWorker.Add(key)
|
||||
}
|
||||
|
||||
// ReconcileClusterPropagationPolicy handles ClusterPropagationPolicy resource changes.
|
||||
// When adding a ClusterPropagationPolicy, the detector will pick the objects in waitingObjects list that matches the policy and
|
||||
// put the object to queue.
|
||||
|
|
Loading…
Reference in New Issue