Merge pull request #3827 from XiShanYongYe-Chang/cleanup-dependencies-distributor

Make some cleanup in dependencies distributor
This commit is contained in:
karmada-bot 2023-08-01 19:05:44 +08:00 committed by GitHub
commit d916d877a4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 52 additions and 62 deletions

View File

@ -664,7 +664,6 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
GenericEvent: make(chan event.GenericEvent),
}
if err := dependenciesDistributor.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup dependencies distributor: %v", err)

View File

@ -13,7 +13,6 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/client-go/dynamic"
@ -33,7 +32,6 @@ import (
configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/detector"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
"github.com/karmada-io/karmada/pkg/sharedcli/ratelimiterflag"
@ -64,14 +62,14 @@ type DependenciesDistributor struct {
// DynamicClient used to fetch arbitrary resources.
DynamicClient dynamic.Interface
InformerManager genericmanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
EventRecorder record.EventRecorder
Processor util.AsyncWorker
RESTMapper meta.RESTMapper
ResourceInterpreter resourceinterpreter.ResourceInterpreter
RateLimiterOptions ratelimiterflag.Options
GenericEvent chan event.GenericEvent
eventHandler cache.ResourceEventHandler
resourceProcessor util.AsyncWorker
genericEvent chan event.GenericEvent
stopCh <-chan struct{}
}
@ -91,7 +89,7 @@ func (d *DependenciesDistributor) OnAdd(obj interface{}) {
if !ok {
return
}
d.Processor.Enqueue(runtimeObj)
d.resourceProcessor.Enqueue(runtimeObj)
}
// OnUpdate handles object update event and push the object to queue.
@ -121,45 +119,45 @@ func (d *DependenciesDistributor) reconcile(key util.QueueKey) error {
return err
}
var errs []error
for i := range bindingList.Items {
binding := &bindingList.Items[i]
if !binding.DeletionTimestamp.IsZero() {
continue
}
matched, err := dependentObjectReferenceMatches(clusterWideKey, binding)
if err != nil {
klog.Errorf("Failed to evaluate if binding(%s/%s) need to sync dependencies: %v", binding.Namespace, binding.Name, err)
errs = append(errs, err)
continue
} else if !matched {
matched := dependentObjectReferenceMatches(clusterWideKey, binding)
if !matched {
klog.V(4).Infof("No need to sync binding(%s/%s)", binding.Namespace, binding.Name)
continue
}
klog.V(4).Infof("Resource binding(%s/%s) is matched for resource(%s/%s)", binding.Namespace, binding.Name, clusterWideKey.Namespace, clusterWideKey.Name)
d.GenericEvent <- event.GenericEvent{Object: binding}
d.genericEvent <- event.GenericEvent{Object: binding}
}
return utilerrors.NewAggregate(errs)
return nil
}
// dependentObjectReferenceMatches tells if the given object is referred by current resource binding.
func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) (bool, error) {
func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBinding *workv1alpha2.ResourceBinding) bool {
dependencies, exist := referenceBinding.Annotations[bindingDependenciesAnnotationKey]
if !exist {
return false, nil
return false
}
var dependenciesSlice []configv1alpha1.DependentObjectReference
err := json.Unmarshal([]byte(dependencies), &dependenciesSlice)
if err != nil {
return false, err
// If unmarshal fails, retrying with an error return will not solve the problem.
// It will only increase the consumption by repeatedly listing the binding.
// Therefore, it is better to print this error and ignore it.
klog.Errorf("Failed to unmarshal binding(%s/%s) dependencies(%s): %v",
referenceBinding.Namespace, referenceBinding.Name, dependencies, err)
return false
}
if len(dependenciesSlice) == 0 {
return false, nil
return false
}
for _, dependence := range dependenciesSlice {
@ -167,11 +165,10 @@ func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBin
objectKey.Kind == dependence.Kind &&
objectKey.Namespace == dependence.Namespace &&
objectKey.Name == dependence.Name {
return true, nil
return true
}
}
return false, nil
return false
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -180,7 +177,7 @@ func dependentObjectReferenceMatches(objectKey keys.ClusterWideKey, referenceBin
func (d *DependenciesDistributor) Reconcile(ctx context.Context, request reconcile.Request) (reconcile.Result, error) {
klog.V(4).Infof("Start to reconcile ResourceBinding(%s)", request.NamespacedName)
bindingObject := &workv1alpha2.ResourceBinding{}
err := d.Client.Get(ctx, types.NamespacedName{Namespace: request.Namespace, Name: request.Name}, bindingObject)
err := d.Client.Get(ctx, request.NamespacedName, bindingObject)
if err != nil {
if apierrors.IsNotFound(err) {
klog.V(4).Infof("ResourceBinding(%s) has been removed.", request.NamespacedName)
@ -233,12 +230,12 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding *
}
}()
if err := d.recordDependenciesForIndependentBinding(binding, dependencies); err != nil {
if err := d.recordDependencies(binding, dependencies); err != nil {
return err
}
// remove orphan attached bindings
orphanBindings, err := d.findOrphanAttachedResourceBindings(binding, dependencies)
orphanBindings, err := d.findOrphanAttachedBindings(binding, dependencies)
if err != nil {
klog.Errorf("Failed to find orphan attached bindings for resourceBinding(%s/%s). Error: %v.",
binding.GetNamespace(), binding.GetName(), err)
@ -267,8 +264,8 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding *
errs = append(errs, err)
continue
}
if !d.InformerManager.IsHandlerExist(gvr, d.EventHandler) {
d.InformerManager.ForResource(gvr, d.EventHandler)
if !d.InformerManager.IsHandlerExist(gvr, d.eventHandler) {
d.InformerManager.ForResource(gvr, d.eventHandler)
startInformerManager = true
}
rawObject, err := helper.FetchResourceTemplate(d.DynamicClient, d.InformerManager, d.RESTMapper, resource)
@ -293,12 +290,13 @@ func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(binding *
return utilerrors.NewAggregate(errs)
}
func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error {
func (d *DependenciesDistributor) recordDependencies(binding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) error {
dependenciesBytes, err := json.Marshal(dependencies)
if err != nil {
klog.Errorf("Failed to marshal dependencies of binding(%s/%s): %v", binding.Namespace, binding.Name, err)
return err
}
depenciesStr := string(dependenciesBytes)
objectAnnotation := binding.GetAnnotations()
if objectAnnotation == nil {
@ -306,11 +304,11 @@ func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(bindin
}
// dependencies are not updated, no need to update annotation.
if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == string(dependenciesBytes) {
if oldDependencies, exist := objectAnnotation[bindingDependenciesAnnotationKey]; exist && oldDependencies == depenciesStr {
return nil
}
objectAnnotation[bindingDependenciesAnnotationKey] = string(dependenciesBytes)
objectAnnotation[bindingDependenciesAnnotationKey] = depenciesStr
return retry.RetryOnConflict(retry.DefaultRetry, func() (err error) {
binding.SetAnnotations(objectAnnotation)
@ -329,7 +327,7 @@ func (d *DependenciesDistributor) recordDependenciesForIndependentBinding(bindin
})
}
func (d *DependenciesDistributor) findOrphanAttachedResourceBindings(independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) ([]*workv1alpha2.ResourceBinding, error) {
func (d *DependenciesDistributor) findOrphanAttachedBindings(independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) ([]*workv1alpha2.ResourceBinding, error) {
attachedBindings, err := d.listAttachedBindings(independentBinding.Namespace, independentBinding.Name)
if err != nil {
return nil, err
@ -352,8 +350,8 @@ func (d *DependenciesDistributor) findOrphanAttachedResourceBindings(independent
}
func (d *DependenciesDistributor) listAttachedBindings(bindingNamespace, bindingName string) (res []*workv1alpha2.ResourceBinding, err error) {
label := generateBindingDependedByLabel(bindingNamespace, bindingName)
selector := labels.SelectorFromSet(label)
labelSet := generateBindingDependedLabels(bindingNamespace, bindingName)
selector := labels.SelectorFromSet(labelSet)
bindingList := &workv1alpha2.ResourceBindingList{}
err = d.Client.List(context.TODO(), bindingList, &client.ListOptions{
Namespace: bindingNamespace,
@ -372,7 +370,7 @@ func (d *DependenciesDistributor) removeScheduleResultFromAttachedBindings(bindi
return nil
}
bindingLabelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName)
bindingLabelKey := generateBindingDependedLabelKey(bindingNamespace, bindingName)
var errs []error
for index, binding := range attachedBindings {
@ -402,8 +400,7 @@ func (d *DependenciesDistributor) createOrUpdateAttachedBinding(attachedBinding
return err
}
updatedBindingSnapshot := mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy)
existBinding.Spec.RequiredBy = updatedBindingSnapshot
existBinding.Spec.RequiredBy = mergeBindingSnapshot(existBinding.Spec.RequiredBy, attachedBinding.Spec.RequiredBy)
existBinding.Labels = util.DedupeAndMergeLabels(existBinding.Labels, attachedBinding.Labels)
existBinding.Spec.Resource = attachedBinding.Spec.Resource
@ -420,13 +417,13 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error {
klog.Infof("Starting dependencies distributor.")
d.stopCh = ctx.Done()
resourceWorkerOptions := util.Options{
Name: "resource detector",
KeyFunc: detector.ClusterWideKeyFunc,
Name: "dependencies resource detector",
KeyFunc: func(obj interface{}) (util.QueueKey, error) { return keys.ClusterWideKeyFunc(obj) },
ReconcileFunc: d.reconcile,
}
d.EventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker(resourceWorkerOptions)
d.Processor.Run(2, d.stopCh)
d.eventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.resourceProcessor = util.NewAsyncWorker(resourceWorkerOptions)
d.resourceProcessor.Run(2, d.stopCh)
<-d.stopCh
klog.Infof("Stopped as stopCh closed.")
@ -435,6 +432,7 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error {
// SetupWithManager creates a controller and register to controller manager.
func (d *DependenciesDistributor) SetupWithManager(mgr controllerruntime.Manager) error {
d.genericEvent = make(chan event.GenericEvent)
return utilerrors.NewAggregate([]error{
mgr.Add(d),
controllerruntime.NewControllerManagedBy(mgr).For(&workv1alpha2.ResourceBinding{}).
@ -474,18 +472,18 @@ func (d *DependenciesDistributor) SetupWithManager(mgr controllerruntime.Manager
RateLimiter: ratelimiterflag.DefaultControllerRateLimiter(d.RateLimiterOptions),
MaxConcurrentReconciles: 2,
}).
WatchesRawSource(&source.Channel{Source: d.GenericEvent}, &handler.EnqueueRequestForObject{}).
WatchesRawSource(&source.Channel{Source: d.genericEvent}, &handler.EnqueueRequestForObject{}).
Complete(d),
})
}
func generateBindingDependedByLabel(bindingNamespace, bindingName string) map[string]string {
labelKey := generateBindingDependedByLabelKey(bindingNamespace, bindingName)
func generateBindingDependedLabels(bindingNamespace, bindingName string) map[string]string {
labelKey := generateBindingDependedLabelKey(bindingNamespace, bindingName)
labelValue := fmt.Sprintf(bindingNamespace + "_" + bindingName)
return map[string]string{labelKey: labelValue}
}
func generateBindingDependedByLabelKey(bindingNamespace, bindingName string) string {
func generateBindingDependedLabelKey(bindingNamespace, bindingName string) string {
bindHashKey := names.GenerateBindingReferenceKey(bindingNamespace, bindingName)
return fmt.Sprintf(bindingDependedByLabelKeyPrefix + bindHashKey)
}
@ -499,7 +497,7 @@ func generateDependencyKey(kind, apiVersion, name, namespace string) string {
}
func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstructured.Unstructured) *workv1alpha2.ResourceBinding {
dependedByLabels := generateBindingDependedByLabel(binding.Namespace, binding.Name)
dependedLabels := generateBindingDependedLabels(binding.Namespace, binding.Name)
var result []workv1alpha2.BindingSnapshot
result = append(result, workv1alpha2.BindingSnapshot{
@ -515,7 +513,7 @@ func buildAttachedBinding(binding *workv1alpha2.ResourceBinding, object *unstruc
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(object, object.GroupVersionKind()),
},
Labels: dependedByLabels,
Labels: dependedLabels,
Finalizers: []string{util.BindingControllerFinalizer},
},
Spec: workv1alpha2.ResourceBindingSpec{

View File

@ -18,7 +18,6 @@ func Test_dependentObjectReferenceMatches(t *testing.T) {
name string
args args
want bool
wantErr bool
}{
{
name: "test custom resource",
@ -37,7 +36,6 @@ func Test_dependentObjectReferenceMatches(t *testing.T) {
},
},
want: true,
wantErr: false,
},
{
name: "test configmap",
@ -56,16 +54,11 @@ func Test_dependentObjectReferenceMatches(t *testing.T) {
},
},
want: true,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := dependentObjectReferenceMatches(tt.args.objectKey, tt.args.referenceBinding)
if (err != nil) != tt.wantErr {
t.Errorf("dependentObjectReferenceMatches() error = %v, wantErr %v", err, tt.wantErr)
return
}
got := dependentObjectReferenceMatches(tt.args.objectKey, tt.args.referenceBinding)
if got != tt.want {
t.Errorf("dependentObjectReferenceMatches() got = %v, want %v", got, tt.want)
}