Plumb through a structured key, keep current behavior. (#703)

* Plumb through a structured key, keep current behavior.

* Rename variable.
This commit is contained in:
Markus Thömmes 2019-09-20 15:52:05 +02:00 committed by Knative Prow Robot
parent dcfc14495f
commit 4a790dd36c
2 changed files with 57 additions and 48 deletions

View File

@ -27,6 +27,7 @@ import (
"go.uber.org/zap"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
@ -155,23 +156,23 @@ func NewImplWithStats(r Reconciler, logger *zap.SugaredLogger, workQueueName str
// EnqueueAfter takes a resource, converts it into a namespace/name string,
// and passes it to EnqueueKey.
func (c *Impl) EnqueueAfter(obj interface{}, after time.Duration) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
object, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
c.logger.Errorw("Enqueue", zap.Error(err))
return
}
c.EnqueueKeyAfter(key, after)
c.EnqueueKeyAfter(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()}, after)
}
// Enqueue takes a resource, converts it into a namespace/name string,
// and passes it to EnqueueKey.
func (c *Impl) Enqueue(obj interface{}) {
key, err := cache.DeletionHandlingMetaNamespaceKeyFunc(obj)
object, err := kmeta.DeletionHandlingAccessor(obj)
if err != nil {
c.logger.Errorw("Enqueue", zap.Error(err))
return
}
c.EnqueueKey(key)
c.EnqueueKey(types.NamespacedName{Namespace: object.GetNamespace(), Name: object.GetName()})
}
// EnqueueControllerOf takes a resource, identifies its controller resource,
@ -186,7 +187,7 @@ func (c *Impl) EnqueueControllerOf(obj interface{}) {
// If we can determine the controller ref of this object, then
// add that object to our workqueue.
if owner := metav1.GetControllerOf(object); owner != nil {
c.EnqueueKey(object.GetNamespace() + "/" + owner.Name)
c.EnqueueKey(types.NamespacedName{Namespace: object.GetNamespace(), Name: owner.Name})
}
}
@ -218,14 +219,14 @@ func (c *Impl) EnqueueLabelOfNamespaceScopedResource(namespaceLabel, nameLabel s
return
}
c.EnqueueKey(fmt.Sprintf("%s/%s", controllerNamespace, controllerKey))
c.EnqueueKey(types.NamespacedName{Namespace: controllerNamespace, Name: controllerKey})
return
}
// Pass through namespace of the object itself if no namespace label specified.
// This is for the scenario that object and the parent resource are of same namespace,
// e.g. to enqueue the revision of an endpoint.
c.EnqueueKey(fmt.Sprintf("%s/%s", object.GetNamespace(), controllerKey))
c.EnqueueKey(types.NamespacedName{Namespace: object.GetNamespace(), Name: controllerKey})
}
}
@ -249,21 +250,27 @@ func (c *Impl) EnqueueLabelOfClusterScopedResource(nameLabel string) func(obj in
return
}
c.EnqueueKey(controllerKey)
ns, name, err := cache.SplitMetaNamespaceKey(controllerKey)
if err != nil {
c.logger.Error(err)
return
}
c.EnqueueKey(types.NamespacedName{Namespace: ns, Name: name})
}
}
// EnqueueKey takes a namespace/name string and puts it onto the work queue.
func (c *Impl) EnqueueKey(key string) {
func (c *Impl) EnqueueKey(key types.NamespacedName) {
c.WorkQueue.Add(key)
c.logger.Debugf("Adding to queue %s (depth: %d)", key, c.WorkQueue.Len())
c.logger.Debugf("Adding to queue %s (depth: %d)", key.String(), c.WorkQueue.Len())
}
// EnqueueKeyAfter takes a namespace/name string and schedules its execution in
// the work queue after given delay.
func (c *Impl) EnqueueKeyAfter(key string, delay time.Duration) {
func (c *Impl) EnqueueKeyAfter(key types.NamespacedName, delay time.Duration) {
c.WorkQueue.AddAfter(key, delay)
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", key, delay, c.WorkQueue.Len())
c.logger.Debugf("Adding to queue %s (delay: %v, depth: %d)", key.String(), delay, c.WorkQueue.Len())
}
// Run starts the controller's worker threads, the number of which is threadiness.
@ -306,7 +313,8 @@ func (c *Impl) processNextWorkItem() bool {
if shutdown {
return false
}
key := obj.(string)
key := obj.(types.NamespacedName)
keyStr := key.String()
c.logger.Debugf("Processing from queue %s (depth: %d)", key, c.WorkQueue.Len())
@ -327,17 +335,17 @@ func (c *Impl) processNextWorkItem() bool {
if err != nil {
status = falseString
}
c.statsReporter.ReportReconcile(time.Since(startTime), key, status)
c.statsReporter.ReportReconcile(time.Since(startTime), keyStr, status)
}()
// Embed the key into the logger and attach that to the context we pass
// to the Reconciler.
logger := c.logger.With(zap.String(logkey.TraceId, uuid.New().String()), zap.String(logkey.Key, key))
logger := c.logger.With(zap.String(logkey.TraceId, uuid.New().String()), zap.String(logkey.Key, keyStr))
ctx := logging.WithLogger(context.TODO(), logger)
// Run Reconcile, passing it the namespace/name string of the
// resource to be synced.
if err = c.Reconciler.Reconcile(ctx, key); err != nil {
if err = c.Reconciler.Reconcile(ctx, keyStr); err != nil {
c.handleErr(err, key)
logger.Infof("Reconcile failed. Time taken: %v.", time.Since(startTime))
return true
@ -351,7 +359,7 @@ func (c *Impl) processNextWorkItem() bool {
return true
}
func (c *Impl) handleErr(err error, key string) {
func (c *Impl) handleErr(err error, key types.NamespacedName) {
c.logger.Errorw("Reconcile error", zap.Error(err))
// Re-queue the key if it's an transient error.
@ -360,7 +368,7 @@ func (c *Impl) handleErr(err error, key string) {
// being processed, queue.Len==0).
if !IsPermanentError(err) && !c.WorkQueue.ShuttingDown() {
c.WorkQueue.AddRateLimited(key)
c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", key, c.WorkQueue.Len())
c.logger.Debugf("Requeuing key %s due to non-permanent error (depth: %d)", key.String(), c.WorkQueue.Len())
return
}

View File

@ -26,6 +26,7 @@ import (
"github.com/google/go-cmp/cmp"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
@ -236,31 +237,31 @@ func TestEnqueues(t *testing.T) {
tests := []struct {
name string
work func(*Impl)
wantQueue []string
wantQueue []types.NamespacedName
}{{
name: "do nothing",
work: func(*Impl) {},
}, {
name: "enqueue key",
work: func(impl *Impl) {
impl.EnqueueKey("foo/bar")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
},
wantQueue: []string{"foo/bar"},
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
}, {
name: "enqueue duplicate key",
work: func(impl *Impl) {
impl.EnqueueKey("foo/bar")
impl.EnqueueKey("foo/bar")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
},
// The queue deduplicates.
wantQueue: []string{"foo/bar"},
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}},
}, {
name: "enqueue different keys",
work: func(impl *Impl) {
impl.EnqueueKey("foo/bar")
impl.EnqueueKey("foo/baz")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "baz"})
},
wantQueue: []string{"foo/bar", "foo/baz"},
wantQueue: []types.NamespacedName{{Namespace: "foo", Name: "bar"}, {Namespace: "foo", Name: "baz"}},
}, {
name: "enqueue resource",
work: func(impl *Impl) {
@ -271,7 +272,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"bar/foo"},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "foo"}},
}, {
name: "enqueue bad resource",
work: func(impl *Impl) {
@ -308,7 +309,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"bar/baz"},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
}, {
name: "enqueue controller of deleted resource with owner",
work: func(impl *Impl) {
@ -328,7 +329,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"bar/baz"},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
}, {
name: "enqueue controller of deleted bad resource",
work: func(impl *Impl) {
@ -382,7 +383,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"qux/baz"},
wantQueue: []types.NamespacedName{{Namespace: "qux", Name: "baz"}},
}, {
name: "enqueue label of namespaced resource with empty namespace label",
work: func(impl *Impl) {
@ -396,7 +397,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"bar/baz"},
wantQueue: []types.NamespacedName{{Namespace: "bar", Name: "baz"}},
}, {
name: "enqueue label of deleted namespaced resource with label",
work: func(impl *Impl) {
@ -414,7 +415,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"qux/baz"},
wantQueue: []types.NamespacedName{{Namespace: "qux", Name: "baz"}},
}, {
name: "enqueue label of deleted bad namespaced resource",
work: func(impl *Impl) {
@ -452,7 +453,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"baz"},
wantQueue: []types.NamespacedName{{Namespace: "", Name: "baz"}},
}, {
name: "enqueue label of deleted cluster scoped resource with label",
work: func(impl *Impl) {
@ -469,7 +470,7 @@ func TestEnqueues(t *testing.T) {
},
})
},
wantQueue: []string{"baz"},
wantQueue: []types.NamespacedName{{Namespace: "", Name: "baz"}},
}, {
name: "enqueue label of deleted bad cluster scoped resource",
work: func(impl *Impl) {
@ -529,7 +530,7 @@ func TestEnqeueAfter(t *testing.T) {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall"}; !cmp.Equal(got, want) {
if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
@ -537,9 +538,9 @@ func TestEnqeueAfter(t *testing.T) {
func TestEnqeueKeyAfter(t *testing.T) {
defer ClearAll()
impl := NewImplWithStats(&NopReconciler{}, TestLogger(t), "Testing", &FakeStatsReporter{})
impl.EnqueueKeyAfter("waiting/for", 5*time.Second)
impl.EnqueueKeyAfter("the/waterfall", 500*time.Millisecond)
impl.EnqueueKeyAfter("to/fall", 20*time.Second)
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "waiting", Name: "for"}, 5*time.Second)
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "the", Name: "waterfall"}, 500*time.Millisecond)
impl.EnqueueKeyAfter(types.NamespacedName{Namespace: "to", Name: "fall"}, 20*time.Second)
time.Sleep(10 * time.Millisecond)
if got, want := impl.WorkQueue.Len(), 0; got != want {
t.Errorf("|Queue| = %d, want: %d", got, want)
@ -550,7 +551,7 @@ func TestEnqeueKeyAfter(t *testing.T) {
t.Errorf("|Queue| = %d, want: %d", got, want)
}
impl.WorkQueue.ShutDown()
if got, want := drainWorkQueue(impl.WorkQueue), []string{"the/waterfall"}; !cmp.Equal(got, want) {
if got, want := drainWorkQueue(impl.WorkQueue), []types.NamespacedName{{Namespace: "the", Name: "waterfall"}}; !cmp.Equal(got, want) {
t.Errorf("Queue = %v, want: %v, diff: %s", got, want, cmp.Diff(got, want))
}
}
@ -609,7 +610,7 @@ func TestStartAndShutdownWithWork(t *testing.T) {
stopCh := make(chan struct{})
doneCh := make(chan struct{})
impl.EnqueueKey("foo/bar")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
go func() {
defer close(doneCh)
@ -634,7 +635,7 @@ func TestStartAndShutdownWithWork(t *testing.T) {
if got, want := r.Count, 1; got != want {
t.Errorf("Count = %v, wanted %v", got, want)
}
if got, want := impl.WorkQueue.NumRequeues("foo/bar"), 0; got != want {
if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("Count = %v, wanted %v", got, want)
}
@ -656,7 +657,7 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
stopCh := make(chan struct{})
doneCh := make(chan struct{})
impl.EnqueueKey("foo/bar")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
go func() {
defer close(doneCh)
@ -690,7 +691,7 @@ func TestStartAndShutdownWithErroringWork(t *testing.T) {
// As NumRequeues can't fully reflect the real state of queue length.
// Here we need to wait for NumRequeues to be more than 1, to ensure
// the key get re-queued and reprocessed as expect.
if got, wantAtLeast := impl.WorkQueue.NumRequeues("foo/bar"), 2; got < wantAtLeast {
if got, wantAtLeast := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 2; got < wantAtLeast {
t.Errorf("Requeue count = %v, wanted at least %v", got, wantAtLeast)
}
}
@ -711,7 +712,7 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
stopCh := make(chan struct{})
doneCh := make(chan struct{})
impl.EnqueueKey("foo/bar")
impl.EnqueueKey(types.NamespacedName{Namespace: "foo", Name: "bar"})
go func() {
defer close(doneCh)
@ -734,20 +735,20 @@ func TestStartAndShutdownWithPermanentErroringWork(t *testing.T) {
}
// Check that the work was not requeued in RateLimiter.
if got, want := impl.WorkQueue.NumRequeues("foo/bar"), 0; got != want {
if got, want := impl.WorkQueue.NumRequeues(types.NamespacedName{Namespace: "foo", Name: "bar"}), 0; got != want {
t.Errorf("Requeue count = %v, wanted %v", got, want)
}
checkStats(t, reporter, 1, 0, 1, falseString)
}
func drainWorkQueue(wq workqueue.RateLimitingInterface) (hasQueue []string) {
func drainWorkQueue(wq workqueue.RateLimitingInterface) (hasQueue []types.NamespacedName) {
for {
key, shutdown := wq.Get()
if key == nil && shutdown {
break
}
hasQueue = append(hasQueue, key.(string))
hasQueue = append(hasQueue, key.(types.NamespacedName))
}
return
}