Change StartAll to take context. (#1247)

* Change StartAll to take context.

This has bugged me since we started using `ctx`, which containers a `stopCh` of sorts as `Done()`.  This is somewhat for consistency, but by using `ctx` explicitly we enable ourselves to take advantage of more contextual information.

I did a quick scan of call sites and the good news is that the `sharedmain` change should be the place through which the vast majority of calls occur, however, the one outlier here is the KPA which calls this manually.  I will stage a PR to manually import pkg into serving to fix this once this lands.

* Add a Run shim for back-compat
This commit is contained in:
Matt Moore 2020-04-25 16:21:49 -07:00 committed by GitHub
parent 3b7a675e73
commit 7b6e21a57a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 43 additions and 31 deletions

View File

@ -329,10 +329,11 @@ func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) {
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", safeKey(key), delay, c.WorkQueue.Len())
}
// Run starts the controller's worker threads, the number of which is threadiness.
// It then blocks until stopCh is closed, at which point it shuts down its internal
// work queue and waits for workers to finish processing their current work items.
func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
// RunContext starts the controller's worker threads, the number of which is threadiness.
// It then blocks until the context is cancelled, at which point it shuts down its
// internal work queue and waits for workers to finish processing their current
// work items.
func (c *Impl) RunContext(ctx context.Context, threadiness int) error {
defer runtime.HandleCrash()
sg := sync.WaitGroup{}
defer sg.Wait()
@ -356,12 +357,23 @@ func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
}
logger.Info("Started workers")
<-stopCh
<-ctx.Done()
logger.Info("Shutting down workers")
return nil
}
// DEPRECATED use RunContext instead.
func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
// Create a context that is cancelled when the stopCh is called.
ctx, cancel := context.WithCancel(context.Background())
go func() {
<-stopCh
cancel()
}()
return c.RunContext(ctx, threadiness)
}
// processNextWorkItem will read a single work item off the workqueue and
// attempt to process it, by calling Reconcile on our Reconciler.
func (c *Impl) processNextWorkItem() bool {
@ -529,14 +541,14 @@ func RunInformers(stopCh <-chan struct{}, informers ...Informer) (func(), error)
}
// StartAll kicks off all of the passed controllers with DefaultThreadsPerController.
func StartAll(stopCh <-chan struct{}, controllers ...*Impl) {
func StartAll(ctx context.Context, controllers ...*Impl) {
wg := sync.WaitGroup{}
// Start all of the controllers.
for _, ctrlr := range controllers {
wg.Add(1)
go func(c *Impl) {
defer wg.Done()
c.Run(DefaultThreadsPerController, stopCh)
c.RunContext(ctx, DefaultThreadsPerController)
}(ctrlr)
}
wg.Wait()

View File

@ -796,21 +796,21 @@ func TestStartAndShutdown(t *testing.T) {
r := &CountingReconciler{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", &FakeStatsReporter{})
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
StartAll(stopCh, impl)
StartAll(ctx, impl)
}()
select {
case <-time.After(10 * time.Millisecond):
// We don't expect completion before the stopCh closes.
// We don't expect completion before the context is cancelled.
case <-doneCh:
t.Error("StartAll finished early.")
}
close(stopCh)
cancel()
select {
case <-time.After(1 * time.Second):
@ -830,23 +830,23 @@ func TestStartAndShutdownWithWork(t *testing.T) {
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
go func() {
defer close(doneCh)
StartAll(stopCh, impl)
StartAll(ctx, impl)
}()
select {
case <-time.After(10 * time.Millisecond):
// We don't expect completion before the stopCh closes.
// We don't expect completion before the context is cancelled.
case <-doneCh:
t.Error("StartAll finished early.")
}
close(stopCh)
cancel()
select {
case <-time.After(1 * time.Second):
@ -877,7 +877,7 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
impl.EnqueueKey(types.NamespacedName{Namespace: "", Name: "bar"})
@ -885,22 +885,22 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
go func() {
defer close(doneCh)
// StartAll blocks until all the worker threads finish, which shouldn't
// be until we close stopCh.
StartAll(stopCh, impl)
// be until we cancel the context.
StartAll(ctx, impl)
}()
select {
case <-time.After(1 * time.Second):
// We don't expect completion before the stopCh closes,
// We don't expect completion before the context is cancelled,
// but the workers should spin on the erroring work.
case <-doneCh:
t.Error("StartAll finished early.")
}
// By closing the stopCh all the workers should complete and
// By cancelling the context all the workers should complete and
// we should close the doneCh.
close(stopCh)
cancel()
select {
case <-time.After(1 * time.Second):
@ -932,23 +932,23 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
reporter := &FakeStatsReporter{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", reporter)
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
go func() {
defer close(doneCh)
StartAll(stopCh, impl)
StartAll(ctx, impl)
}()
select {
case <-time.After(20 * time.Millisecond):
// We don't expect completion before the stopCh closes.
// We don't expect completion before the context is cancelled.
case <-doneCh:
t.Error("StartAll finished early.")
}
close(stopCh)
cancel()
select {
case <-time.After(1 * time.Second):
@ -1023,12 +1023,12 @@ func TestImplGlobalResync(t *testing.T) {
r := &CountingReconciler{}
impl := NewImplWithStats(r, TestLogger(t), "Testing", &FakeStatsReporter{})
stopCh := make(chan struct{})
ctx, cancel := context.WithCancel(context.Background())
doneCh := make(chan struct{})
go func() {
defer close(doneCh)
StartAll(stopCh, impl)
StartAll(ctx, impl)
}()
impl.GlobalResync(&dummyInformer{})
@ -1037,11 +1037,11 @@ func TestImplGlobalResync(t *testing.T) {
// goes up to len(dummyObjs) times a second.
select {
case <-time.After((1 + 3) * time.Second):
// We don't expect completion before the stopCh closes.
// We don't expect completion before the context is cancelled.
case <-doneCh:
t.Error("StartAll finished early.")
}
close(stopCh)
cancel()
select {
case <-time.After(1 * time.Second):

View File

@ -180,7 +180,7 @@ func MainWithConfig(ctx context.Context, component string, cfg *rest.Config, cto
logger.Fatalw("Failed to start informers", zap.Error(err))
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)
go controller.StartAll(ctx, controllers...)
<-ctx.Done()
}
@ -266,7 +266,7 @@ func WebhookMainWithConfig(ctx context.Context, component string, cfg *rest.Conf
wh.InformersHaveSynced()
}
logger.Info("Starting controllers...")
go controller.StartAll(ctx.Done(), controllers...)
go controller.StartAll(ctx, controllers...)
// This will block until either a signal arrives or one of the grouped functions
// returns an error.