Merge pull request #5809 from CharlesQQ/introduce-flag-dependencies-distributor

feat(dependenciesdistributor):  introduce --concurrent-dependent-resource-syncs flag
This commit is contained in:
karmada-bot 2024-11-19 21:02:56 +08:00 committed by GitHub
commit 6795dba9d1
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 14 additions and 8 deletions

View File

@ -773,13 +773,14 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
}
if features.FeatureGate.Enabled(features.PropagateDeps) {
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
ConcurrentDependentResourceSyncs: opts.ConcurrentDependentResourceSyncs,
}
if err := dependenciesDistributor.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup dependencies distributor: %v", err)

View File

@ -128,6 +128,8 @@ type Options struct {
ConcurrentClusterPropagationPolicySyncs int
// ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently.
ConcurrentResourceTemplateSyncs int
// ConcurrentDependentResourceSyncs is the number of dependent resource that are allowed to sync concurrently.
ConcurrentDependentResourceSyncs int
// If set to true enables NoExecute Taints and will evict all not-tolerating
// objects propagating on Clusters tainted with this kind of Taints.
EnableTaintManager bool
@ -219,6 +221,7 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
flags.IntVar(&o.ConcurrentPropagationPolicySyncs, "concurrent-propagation-policy-syncs", 1, "The number of PropagationPolicy that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentClusterPropagationPolicySyncs, "concurrent-cluster-propagation-policy-syncs", 1, "The number of ClusterPropagationPolicy that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentDependentResourceSyncs, "concurrent-dependent-resource-syncs", 2, "The number of dependent resource that are allowed to sync concurrently.")
flags.BoolVar(&o.EnableTaintManager, "enable-taint-manager", true, "If set to true enables NoExecute Taints and will evict all not-tolerating objects propagating on Clusters tainted with this kind of Taints.")
flags.DurationVar(&o.GracefulEvictionTimeout.Duration, "graceful-eviction-timeout", 10*time.Minute, "Specifies the timeout period waiting for the graceful-eviction-controller performs the final removal since the workload(resource) has been moved to the graceful eviction tasks.")
flags.BoolVar(&o.EnableClusterResourceModeling, "enable-cluster-resource-modeling", true, "Enable means controller would build resource modeling for each cluster by syncing Nodes and Pods resources.\n"+

View File

@ -110,6 +110,8 @@ type DependenciesDistributor struct {
resourceProcessor util.AsyncWorker
genericEvent chan event.TypedGenericEvent[*workv1alpha2.ResourceBinding]
stopCh <-chan struct{}
// ConcurrentDependentResourceSyncs is the number of dependent resource that are allowed to sync concurrently.
ConcurrentDependentResourceSyncs int
}
// Check if our DependenciesDistributor implements necessary interfaces
@ -615,7 +617,7 @@ func (d *DependenciesDistributor) Start(ctx context.Context) error {
}
d.eventHandler = fedinformer.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.resourceProcessor = util.NewAsyncWorker(resourceWorkerOptions)
d.resourceProcessor.Run(2, d.stopCh)
d.resourceProcessor.Run(d.ConcurrentDependentResourceSyncs, d.stopCh)
<-d.stopCh
klog.Infof("Stopped as stopCh closed.")