mirror of https://github.com/knative/pkg.git
Wait for work to complete before returning from Run. (#259)
Partially addresses https://github.com/knative/serving/issues/3074
This commit is contained in:
parent
a118428db1
commit
18a6804326
|
@ -19,6 +19,7 @@ package controller
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"go.uber.org/zap"
|
"go.uber.org/zap"
|
||||||
|
@ -208,13 +209,17 @@ func (c *Impl) EnqueueKey(key string) {
|
||||||
// work queue and waits for workers to finish processing their current work items.
|
// work queue and waits for workers to finish processing their current work items.
|
||||||
func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
|
func (c *Impl) Run(threadiness int, stopCh <-chan struct{}) error {
|
||||||
defer runtime.HandleCrash()
|
defer runtime.HandleCrash()
|
||||||
|
sg := sync.WaitGroup{}
|
||||||
|
defer sg.Wait()
|
||||||
defer c.WorkQueue.ShutDown()
|
defer c.WorkQueue.ShutDown()
|
||||||
|
|
||||||
// Launch workers to process resources that get enqueued to our workqueue.
|
// Launch workers to process resources that get enqueued to our workqueue.
|
||||||
logger := c.logger
|
logger := c.logger
|
||||||
logger.Info("Starting controller and workers")
|
logger.Info("Starting controller and workers")
|
||||||
for i := 0; i < threadiness; i++ {
|
for i := 0; i < threadiness; i++ {
|
||||||
|
sg.Add(1)
|
||||||
go func() {
|
go func() {
|
||||||
|
defer sg.Done()
|
||||||
for c.processNextWorkItem() {
|
for c.processNextWorkItem() {
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
Loading…
Reference in New Issue