add concurrency for each controller (#2160)

This commit is contained in:
zhaojizhuang 2021-06-26 03:41:44 +08:00 committed by GitHub
parent e117baa4cf
commit 4cdacd0473
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 1 deletions

View File

@ -57,6 +57,7 @@ var (
// when processing the controller's workqueue. Controller binaries // when processing the controller's workqueue. Controller binaries
// may adjust this process-wide default. For finer control, invoke // may adjust this process-wide default. For finer control, invoke
// Run on the controller directly. // Run on the controller directly.
// TODO rename the const to Concurrency and deprecated this
DefaultThreadsPerController = 2 DefaultThreadsPerController = 2
) )
@ -203,6 +204,9 @@ type Impl struct {
// which are not required to complete at the highest priority. // which are not required to complete at the highest priority.
workQueue *twoLaneQueue workQueue *twoLaneQueue
// Concurrency - The number of workers to use when processing the controller's workqueue.
Concurrency int
// Sugared logger is easier to use but is not as performant as the // Sugared logger is easier to use but is not as performant as the
// raw logger. In performance critical paths, call logger.Desugar() // raw logger. In performance critical paths, call logger.Desugar()
// and use the returned raw logger instead. In addition to the // and use the returned raw logger instead. In addition to the
@ -221,6 +225,7 @@ type ControllerOptions struct { //nolint // for backcompat.
Logger *zap.SugaredLogger Logger *zap.SugaredLogger
Reporter StatsReporter Reporter StatsReporter
RateLimiter workqueue.RateLimiter RateLimiter workqueue.RateLimiter
Concurrency int
} }
// NewImpl instantiates an instance of our controller that will feed work to the // NewImpl instantiates an instance of our controller that will feed work to the
@ -244,12 +249,16 @@ func NewImplFull(r Reconciler, options ControllerOptions) *Impl {
if options.Reporter == nil { if options.Reporter == nil {
options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger) options.Reporter = MustNewStatsReporter(options.WorkQueueName, options.Logger)
} }
if options.Concurrency == 0 {
options.Concurrency = DefaultThreadsPerController
}
return &Impl{ return &Impl{
Name: options.WorkQueueName, Name: options.WorkQueueName,
Reconciler: r, Reconciler: r,
workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter), workQueue: newTwoLaneWorkQueue(options.WorkQueueName, options.RateLimiter),
logger: options.Logger, logger: options.Logger,
statsReporter: options.Reporter, statsReporter: options.Reporter,
Concurrency: options.Concurrency,
} }
} }
@ -723,9 +732,10 @@ func StartAll(ctx context.Context, controllers ...*Impl) {
// Start all of the controllers. // Start all of the controllers.
for _, ctrlr := range controllers { for _, ctrlr := range controllers {
wg.Add(1) wg.Add(1)
concurrency := ctrlr.Concurrency
go func(c *Impl) { go func(c *Impl) {
defer wg.Done() defer wg.Done()
c.RunContext(ctx, DefaultThreadsPerController) c.RunContext(ctx, concurrency)
}(ctrlr) }(ctrlr)
} }
wg.Wait() wg.Wait()

View File

@ -38,6 +38,9 @@ type Options struct {
// DemoteFunc configures the demote function this reconciler uses // DemoteFunc configures the demote function this reconciler uses
DemoteFunc func(b reconciler.Bucket) DemoteFunc func(b reconciler.Bucket)
// Concurrency - The number of workers to use when processing the controller's workqueue.
Concurrency int
} }
// OptionsFn is a callback method signature that accepts an Impl and returns // OptionsFn is a callback method signature that accepts an Impl and returns