Merge pull request #1321 from pigletfly/fix-worker-num

introduce concurrent flags to controllers
This commit is contained in:
karmada-bot 2022-02-25 15:17:06 +08:00 committed by GitHub
commit d980eeac5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 118 additions and 49 deletions

View File

@ -15,9 +15,11 @@ import (
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"github.com/karmada-io/karmada/cmd/agent/app/options"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
controllerscontext "github.com/karmada-io/karmada/pkg/controllers/context"
"github.com/karmada-io/karmada/pkg/controllers/execution"
"github.com/karmada-io/karmada/pkg/controllers/mcs"
@ -126,6 +128,12 @@ func run(ctx context.Context, karmadaConfig karmadactl.KarmadaConfig, opts *opti
LeaderElectionID: fmt.Sprintf("karmada-agent-%s", opts.ClusterName),
LeaderElectionNamespace: opts.LeaderElection.ResourceNamespace,
LeaderElectionResourceLock: opts.LeaderElection.ResourceLock,
Controller: v1alpha1.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
},
},
})
if err != nil {
return fmt.Errorf("failed to build controller manager: %w", err)
@ -165,6 +173,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
},
StopChan: stopChan,
}
@ -222,16 +231,16 @@ func startExecutionController(ctx controllerscontext.Context) (bool, error) {
func startWorkStatusController(ctx controllerscontext.Context) (bool, error) {
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicateOnAgent(),
ClusterClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: ctx.Opts.ConcurrentWorkSyncs,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
@ -247,7 +256,7 @@ func startServiceExportController(ctx controllerscontext.Context) (bool, error)
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportControllerOnAgent(ctx.Opts.ClusterName),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSetForAgent,
ClusterCacheSyncTimeout: ctx.Opts.ClusterCacheSyncTimeout,

View File

@ -55,6 +55,12 @@ type Options struct {
ClusterAPIEndpoint string
// ProxyServerAddress holds the proxy server address that is used to proxy to the cluster.
ProxyServerAddress string
// concurrentClusterSyncs is the number of cluster objects that are
// allowed to sync concurrently.
ConcurrentClusterSyncs int
// ConcurrentWorkSyncs is the number of work objects that are
// allowed to sync concurrently.
ConcurrentWorkSyncs int
}
// NewOptions builds an default scheduler options.
@ -97,4 +103,6 @@ func (o *Options) AddFlags(fs *pflag.FlagSet, allControllers []string) {
fs.StringVar(&o.ClusterAPIEndpoint, "cluster-api-endpoint", o.ClusterAPIEndpoint, "APIEndpoint of the cluster.")
fs.StringVar(&o.ProxyServerAddress, "proxy-server-address", o.ProxyServerAddress, "Address of the proxy server that is used to proxy to the cluster.")
fs.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.")
fs.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.")
fs.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.")
}

View File

@ -8,18 +8,23 @@ import (
"strconv"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
kubeclientset "k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2"
controllerruntime "sigs.k8s.io/controller-runtime"
"sigs.k8s.io/controller-runtime/pkg/config/v1alpha1"
"sigs.k8s.io/controller-runtime/pkg/event"
"sigs.k8s.io/controller-runtime/pkg/healthz"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"github.com/karmada-io/karmada/cmd/controller-manager/app/options"
clusterv1alpha1 "github.com/karmada-io/karmada/pkg/apis/cluster/v1alpha1"
workv1alpha1 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/clusterdiscovery/clusterapi"
"github.com/karmada-io/karmada/pkg/controllers/binding"
"github.com/karmada-io/karmada/pkg/controllers/cluster"
@ -88,6 +93,15 @@ func Run(ctx context.Context, opts *options.Options) error {
HealthProbeBindAddress: net.JoinHostPort(opts.BindAddress, strconv.Itoa(opts.SecurePort)),
LivenessEndpointName: "/healthz",
MetricsBindAddress: opts.MetricsBindAddress,
Controller: v1alpha1.ControllerConfigurationSpec{
GroupKindConcurrency: map[string]int{
workv1alpha1.SchemeGroupVersion.WithKind("Work").GroupKind().String(): opts.ConcurrentWorkSyncs,
workv1alpha2.SchemeGroupVersion.WithKind("ResourceBinding").GroupKind().String(): opts.ConcurrentResourceBindingSyncs,
workv1alpha2.SchemeGroupVersion.WithKind("ClusterResourceBinding").GroupKind().String(): opts.ConcurrentClusterResourceBindingSyncs,
clusterv1alpha1.SchemeGroupVersion.WithKind("Cluster").GroupKind().String(): opts.ConcurrentClusterSyncs,
schema.GroupVersionKind{Group: "", Version: "v1", Kind: "Namespace"}.GroupKind().String(): opts.ConcurrentNamespaceSyncs,
},
},
})
if err != nil {
klog.Errorf("failed to build controller manager: %v", err)
@ -247,19 +261,20 @@ func startExecutionController(ctx controllerscontext.Context) (enabled bool, err
func startWorkStatusController(ctx controllerscontext.Context) (enabled bool, err error) {
opts := ctx.Opts
workStatusController := &status.WorkStatusController{
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
Client: ctx.Mgr.GetClient(),
EventRecorder: ctx.Mgr.GetEventRecorderFor(status.WorkStatusControllerName),
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
ObjectWatcher: ctx.ObjectWatcher,
PredicateFunc: helper.NewExecutionPredicate(ctx.Mgr),
ClusterClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
ConcurrentWorkStatusSyncs: opts.ConcurrentWorkSyncs,
}
workStatusController.RunWorkQueue()
if err := workStatusController.SetupWithManager(ctx.Mgr); err != nil {
klog.Fatalf("Failed to setup work status controller: %v", err)
return false, err
}
return true, nil
@ -289,7 +304,7 @@ func startServiceExportController(ctx controllerscontext.Context) (enabled bool,
RESTMapper: ctx.Mgr.GetRESTMapper(),
InformerManager: informermanager.GetInstance(),
StopChan: ctx.StopChan,
WorkerNumber: 1,
WorkerNumber: 3,
PredicateFunc: helper.NewPredicateForServiceExportController(ctx.Mgr),
ClusterDynamicClientSetFunc: util.NewClusterDynamicClientSet,
ClusterCacheSyncTimeout: opts.ClusterCacheSyncTimeout,
@ -364,15 +379,17 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
objectWatcher := objectwatcher.NewObjectWatcher(mgr.GetClient(), mgr.GetRESTMapper(), util.NewClusterDynamicClientSet, resourceInterpreter)
resourceDetector := &detector.ResourceDetector{
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
InformerManager: controlPlaneInformerManager,
RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
ResourceInterpreter: resourceInterpreter,
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
DiscoveryClientSet: discoverClientSet,
Client: mgr.GetClient(),
InformerManager: controlPlaneInformerManager,
RESTMapper: mgr.GetRESTMapper(),
DynamicClient: dynamicClientSet,
SkippedResourceConfig: skippedResourceConfig,
SkippedPropagatingNamespaces: skippedPropagatingNamespaces,
ResourceInterpreter: resourceInterpreter,
EventRecorder: mgr.GetEventRecorderFor("resource-detector"),
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
ConcurrentResourceBindingSyncs: opts.ConcurrentResourceBindingSyncs,
}
if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
@ -394,6 +411,7 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ClusterAPIQPS: opts.ClusterAPIQPS,
ClusterAPIBurst: opts.ClusterAPIBurst,
SkippedPropagatingNamespaces: opts.SkippedPropagatingNamespaces,
ConcurrentWorkSyncs: opts.ConcurrentWorkSyncs,
},
StopChan: stopChan,
DynamicClientSet: dynamicClientSet,
@ -438,6 +456,7 @@ func setupClusterAPIClusterDetector(mgr controllerruntime.Manager, opts *options
ClusterAPIConfig: clusterAPIRestConfig,
ClusterAPIClient: clusterAPIClient,
InformerManager: informermanager.NewSingleClusterInformerManager(dynamic.NewForConfigOrDie(clusterAPIRestConfig), 0, stopChan),
ConcurrentReconciles: 3,
}
if err := mgr.Add(clusterAPIClusterDetector); err != nil {
klog.Fatalf("Failed to setup cluster-api cluster detector: %v", err)

View File

@ -79,6 +79,23 @@ type Options struct {
// It can be set to "0" to disable the metrics serving.
// Defaults to ":8080".
MetricsBindAddress string
// concurrentClusterSyncs is the number of cluster objects that are
// allowed to sync concurrently.
ConcurrentClusterSyncs int
// ConcurrentClusterResourceBindingSyncs is the number of clusterresourcebinding objects that are
// allowed to sync concurrently.
ConcurrentClusterResourceBindingSyncs int
// ConcurrentWorkSyncs is the number of Work objects that are
// allowed to sync concurrently.
ConcurrentWorkSyncs int
// ConcurrentResourceBindingSyncs is the number of resourcebinding objects that are
// allowed to sync concurrently.
ConcurrentResourceBindingSyncs int
// ConcurrentNamespaceSyncs is the number of Namespace objects that are
// allowed to sync concurrently.
ConcurrentNamespaceSyncs int
// ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently.
ConcurrentResourceTemplateSyncs int
}
// NewOptions builds an empty options.
@ -134,4 +151,10 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers []string) {
flags.DurationVar(&o.ClusterCacheSyncTimeout.Duration, "cluster-cache-sync-timeout", util.CacheSyncTimeout, "Timeout period waiting for cluster cache to sync.")
flags.DurationVar(&o.ResyncPeriod.Duration, "resync-period", 0, "Base frequency the informers are resynced.")
flags.StringVar(&o.MetricsBindAddress, "metrics-bind-address", ":8080", "The TCP address that the controller should bind to for serving prometheus metrics(e.g. 127.0.0.1:8088, :8088)")
flags.IntVar(&o.ConcurrentClusterSyncs, "concurrent-cluster-syncs", 5, "The number of Clusters that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentClusterResourceBindingSyncs, "concurrent-clusterresourcebinding-syncs", 5, "The number of ClusterResourceBindings that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentResourceBindingSyncs, "concurrent-resourcebinding-syncs", 5, "The number of ResourceBindings that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentWorkSyncs, "concurrent-work-syncs", 5, "The number of Works that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentNamespaceSyncs, "concurrent-namespace-syncs", 1, "The number of Namespaces that are allowed to sync concurrently.")
flags.IntVar(&o.ConcurrentResourceTemplateSyncs, "concurrent-resource-template-syncs", 5, "The number of resource templates that are allowed to sync concurrently.")
}

View File

@ -47,6 +47,7 @@ type ClusterDetector struct {
InformerManager informermanager.SingleClusterInformerManager
EventHandler cache.ResourceEventHandler
Processor util.AsyncWorker
ConcurrentReconciles int
stopCh <-chan struct{}
}
@ -58,7 +59,7 @@ func (d *ClusterDetector) Start(ctx context.Context) error {
d.EventHandler = informermanager.NewHandlerOnEvents(d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("cluster-api cluster detector", ClusterWideKeyFunc, d.Reconcile)
d.Processor.Run(1, d.stopCh)
d.Processor.Run(d.ConcurrentReconciles, d.stopCh)
d.discoveryCluster()
<-d.stopCh

View File

@ -47,6 +47,8 @@ type Options struct {
SkippedPropagatingNamespaces []string
// ClusterName is the name of cluster.
ClusterName string
// ConcurrentWorkSyncs is the number of Works that are allowed to sync concurrently.
ConcurrentWorkSyncs int
}
// Context defines the context object for controller.

View File

@ -34,19 +34,19 @@ const WorkStatusControllerName = "work-status-controller"
// WorkStatusController is to sync status of Work.
type WorkStatusController struct {
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
InformerManager informermanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
WorkerNumber int // WorkerNumber is the number of worker goroutines
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterCacheSyncTimeout metav1.Duration
client.Client // used to operate Work resources.
EventRecorder record.EventRecorder
RESTMapper meta.RESTMapper
InformerManager informermanager.MultiClusterInformerManager
eventHandler cache.ResourceEventHandler // eventHandler knows how to handle events from the member cluster.
StopChan <-chan struct{}
worker util.AsyncWorker // worker process resources periodic from rateLimitingQueue.
// ConcurrentWorkStatusSyncs is the number of Work status that are allowed to sync concurrently.
ConcurrentWorkStatusSyncs int
ObjectWatcher objectwatcher.ObjectWatcher
PredicateFunc predicate.Predicate
ClusterClientSetFunc func(clusterName string, client client.Client) (*util.DynamicClusterClient, error)
ClusterCacheSyncTimeout metav1.Duration
}
// Reconcile performs a full reconciliation for the object referred to by the Request.
@ -115,7 +115,7 @@ func (c *WorkStatusController) getEventHandler() cache.ResourceEventHandler {
// RunWorkQueue initializes worker and run it, worker will process resource asynchronously.
func (c *WorkStatusController) RunWorkQueue() {
c.worker = util.NewAsyncWorker("work-status", generateKey, c.syncWorkStatus)
c.worker.Run(c.WorkerNumber, c.StopChan)
c.worker.Run(c.ConcurrentWorkStatusSyncs, c.StopChan)
}
// generateKey generates a key from obj, the key contains cluster, GVK, namespace and name.

View File

@ -244,7 +244,8 @@ func (c *Controller) SetupWithManager(mgr controllerruntime.Manager) error {
return utilerrors.NewAggregate([]error{
controllerruntime.NewControllerManagedBy(mgr).For(&clusterv1alpha1.Cluster{}).WithEventFilter(clusterPredicateFunc).
Watches(&source.Kind{Type: &rbacv1.ClusterRole{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleMapFunc())).
Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())).Complete(c),
Watches(&source.Kind{Type: &rbacv1.ClusterRoleBinding{}}, handler.EnqueueRequestsFromMapFunc(c.newClusterRoleBindingMapFunc())).
Complete(c),
mgr.Add(c),
})
}

View File

@ -76,6 +76,12 @@ type ResourceDetector struct {
waitingObjects map[keys.ClusterWideKey]struct{}
// waitingLock is the lock for waitingObjects operation.
waitingLock sync.RWMutex
// ConcurrentResourceTemplateSyncs is the number of resource templates that are allowed to sync concurrently.
// Larger number means responsive resource template syncing but more CPU(and network) load.
ConcurrentResourceTemplateSyncs int
// ConcurrentResourceBindingSyncs is the number of ResourceBinding that are allowed to sync concurrently.
// Larger number means responsive resource template syncing but more CPU(and network) load.
ConcurrentResourceBindingSyncs int
stopCh <-chan struct{}
}
@ -114,7 +120,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
// setup binding reconcile worker
d.bindingReconcileWorker = util.NewAsyncWorker("resourceBinding reconciler", ClusterWideKeyFunc, d.ReconcileResourceBinding)
d.bindingReconcileWorker.Run(1, d.stopCh)
d.bindingReconcileWorker.Run(d.ConcurrentResourceBindingSyncs, d.stopCh)
// watch and enqueue ResourceBinding changes.
resourceBindingGVR := schema.GroupVersionResource{
@ -137,7 +143,7 @@ func (d *ResourceDetector) Start(ctx context.Context) error {
d.EventHandler = informermanager.NewFilteringHandlerOnAllEvents(d.EventFilter, d.OnAdd, d.OnUpdate, d.OnDelete)
d.Processor = util.NewAsyncWorker("resource detector", ClusterWideKeyFunc, d.Reconcile)
d.Processor.Run(1, d.stopCh)
d.Processor.Run(d.ConcurrentResourceTemplateSyncs, d.stopCh)
go d.discoverResources(30 * time.Second)
<-d.stopCh