mirror of https://github.com/knative/pkg.git
Simplify the core controller logic. (#184)
I was hoping to switch from a simple string key to a struct to include additional metadata into our workqueue item (in particular the time at which it was queued), but the great unit tests here reminded me that this would not properly deduplicate keys (they would have different queue times!), so I'm settling for simplifying the loop in a few ways: 1. Keys can only be strings, so remove some error checking and a test. 1. We were wrapping a block in a `func` because it contained `defer`, but the outer function had nothing in it. I'd still like to try and find a way to track queue times for keys, so that we can surface two interesting metrics: 1. How long are things spending queued? 1. How long between events being observed and us responding? (above + reconcile latency)
This commit is contained in:
parent
0f793b2942
commit
102237ce9b
|
@ -235,74 +235,52 @@ func (c *Impl) processNextWorkItem() bool {
|
|||
if shutdown {
|
||||
return false
|
||||
}
|
||||
key := obj.(string)
|
||||
|
||||
// We wrap this block in a func so we can defer c.base.WorkQueue.Done.
|
||||
err := func(obj interface{}) error {
|
||||
startTime := time.Now()
|
||||
// Send the metrics for the current queue depth
|
||||
c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len()))
|
||||
startTime := time.Now()
|
||||
// Send the metrics for the current queue depth
|
||||
c.statsReporter.ReportQueueDepth(int64(c.WorkQueue.Len()))
|
||||
|
||||
// We call Done here so the workqueue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the workqueue and attempted again after a back-off
|
||||
// period.
|
||||
defer c.WorkQueue.Done(obj)
|
||||
// We call Done here so the workqueue knows we have finished
|
||||
// processing this item. We also must remember to call Forget if we
|
||||
// do not want this work item being re-queued. For example, we do
|
||||
// not call Forget if a transient error occurs, instead the item is
|
||||
// put back on the workqueue and attempted again after a back-off
|
||||
// period.
|
||||
defer c.WorkQueue.Done(key)
|
||||
|
||||
// We expect strings to come off the workqueue. These are of the
|
||||
// form namespace/name. We do this as the delayed nature of the
|
||||
// workqueue means the items in the informer cache may actually be
|
||||
// more up to date that when the item was initially put onto the
|
||||
// workqueue.
|
||||
key, ok := obj.(string)
|
||||
if !ok {
|
||||
// As the item in the workqueue is actually invalid, we call
|
||||
// Forget here else we'd go into a loop of attempting to
|
||||
// process a work item that is invalid.
|
||||
c.WorkQueue.Forget(obj)
|
||||
c.logger.Errorf("expected string in workqueue but got %#v", obj)
|
||||
c.statsReporter.ReportReconcile(time.Now().Sub(startTime), "[InvalidKeyType]", falseString)
|
||||
return nil
|
||||
var err error
|
||||
defer func() {
|
||||
status := trueString
|
||||
if err != nil {
|
||||
status = falseString
|
||||
}
|
||||
c.statsReporter.ReportReconcile(time.Now().Sub(startTime), key, status)
|
||||
}()
|
||||
|
||||
var err error
|
||||
defer func() {
|
||||
status := trueString
|
||||
if err != nil {
|
||||
status = falseString
|
||||
}
|
||||
c.statsReporter.ReportReconcile(time.Now().Sub(startTime), key, status)
|
||||
}()
|
||||
// Embed the key into the logger and attach that to the context we pass
|
||||
// to the Reconciler.
|
||||
logger := c.logger.With(zap.String(logkey.Key, key))
|
||||
ctx := logging.WithLogger(context.TODO(), logger)
|
||||
|
||||
// Embed the key into the logger and attach that to the context we pass
|
||||
// to the Reconciler.
|
||||
logger := c.logger.With(zap.String(logkey.Key, key))
|
||||
ctx := logging.WithLogger(context.TODO(), logger)
|
||||
|
||||
// Run Reconcile, passing it the namespace/name string of the
|
||||
// resource to be synced.
|
||||
if err = c.Reconciler.Reconcile(ctx, key); err != nil {
|
||||
c.handleErr(err, key)
|
||||
return fmt.Errorf("error syncing %q: %v", key, err)
|
||||
}
|
||||
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.WorkQueue.Forget(obj)
|
||||
c.logger.Infof("Successfully synced %q", key)
|
||||
return nil
|
||||
}(obj)
|
||||
|
||||
if err != nil {
|
||||
c.logger.Error(zap.Error(err))
|
||||
// Run Reconcile, passing it the namespace/name string of the
|
||||
// resource to be synced.
|
||||
if err = c.Reconciler.Reconcile(ctx, key); err != nil {
|
||||
c.handleErr(err, key)
|
||||
return true
|
||||
}
|
||||
|
||||
// Finally, if no error occurs we Forget this item so it does not
|
||||
// get queued again until another change happens.
|
||||
c.WorkQueue.Forget(key)
|
||||
c.logger.Infof("Successfully synced %q", key)
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func (c *Impl) handleErr(err error, key interface{}) {
|
||||
func (c *Impl) handleErr(err error, key string) {
|
||||
c.logger.Error(zap.Error(err))
|
||||
|
||||
// Re-queue the key if it's an transient error.
|
||||
if !IsPermanentError(err) {
|
||||
c.WorkQueue.AddRateLimited(key)
|
||||
|
|
|
@ -555,39 +555,6 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
|
|||
checkStats(t, reporter, 1, 0, 1, falseString)
|
||||
}
|
||||
|
||||
func TestStartAndShutdownWithInvalidWork(t *testing.T) {
|
||||
r := &CountingReconciler{}
|
||||
reporter := &FakeStatsReporter{}
|
||||
impl := NewImpl(r, TestLogger(t), "Testing", reporter)
|
||||
|
||||
stopCh := make(chan struct{})
|
||||
|
||||
// Add a nonsense work item, which we couldn't ordinarily get into our workqueue.
|
||||
thing := struct{}{}
|
||||
impl.WorkQueue.AddRateLimited(thing)
|
||||
|
||||
var eg errgroup.Group
|
||||
eg.Go(func() error {
|
||||
return impl.Run(1, stopCh)
|
||||
})
|
||||
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
close(stopCh)
|
||||
|
||||
if err := eg.Wait(); err != nil {
|
||||
t.Errorf("Wait() = %v", err)
|
||||
}
|
||||
|
||||
if got, want := r.Count, 0; got != want {
|
||||
t.Errorf("Count = %v, wanted %v", got, want)
|
||||
}
|
||||
if got, want := impl.WorkQueue.NumRequeues(thing), 0; got != want {
|
||||
t.Errorf("Count = %v, wanted %v", got, want)
|
||||
}
|
||||
|
||||
checkStats(t, reporter, 1, 0, 1, falseString)
|
||||
}
|
||||
|
||||
func drainWorkQueue(wq workqueue.RateLimitingInterface) (hasQueue []string) {
|
||||
for {
|
||||
key, shutdown := wq.Get()
|
||||
|
|
Loading…
Reference in New Issue