diff --git a/apis/duck/cached_test.go b/apis/duck/cached_test.go index 739e228d1..e275e0c2d 100644 --- a/apis/duck/cached_test.go +++ b/apis/duck/cached_test.go @@ -19,24 +19,25 @@ package duck import ( "context" "fmt" - "sync/atomic" "testing" "time" + "go.uber.org/atomic" "golang.org/x/sync/errgroup" + "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/client-go/tools/cache" ) type BlockingInformerFactory struct { block chan struct{} - nCalls int32 + nCalls atomic.Int32 } var _ InformerFactory = (*BlockingInformerFactory)(nil) func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, cache.GenericLister, error) { - atomic.AddInt32(&bif.nCalls, 1) + bif.nCalls.Inc() // Wait here until we can acquire the lock <-bif.block @@ -48,14 +49,16 @@ func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVer } func TestSameGVR(t *testing.T) { - bif := &BlockingInformerFactory{block: make(chan struct{})} + bif := &BlockingInformerFactory{ + block: make(chan struct{}), + } cif := &CachedInformerFactory{ Delegate: bif, } // counts the number of calls to cif.Get that returned - retGetCount := int32(0) + retGetCount := atomic.NewInt32(0) errGrp, ctx := errgroup.WithContext(context.Background()) @@ -72,7 +75,7 @@ func TestSameGVR(t *testing.T) { for i := 0; i < iter; i++ { errGrp.Go(func() error { _, _, err := cif.Get(ctx, gvr) - atomic.AddInt32(&retGetCount, 1) + retGetCount.Inc() return err }) } @@ -82,10 +85,10 @@ func TestSameGVR(t *testing.T) { // Check that no call to cif.Get have returned and bif.Get was called // only once. - if got, want := atomic.LoadInt32(&retGetCount), int32(0); got != want { + if got, want := retGetCount.Load(), int32(0); got != want { t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) } - if got, want := atomic.LoadInt32(&bif.nCalls), int32(1); got != want { + if got, want := bif.nCalls.Load(), int32(1); got != want { t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) } @@ -93,28 +96,30 @@ func TestSameGVR(t *testing.T) { close(bif.block) if err := errGrp.Wait(); err != nil { - t.Fatalf("Error while calling cif.Get: %v", err) + t.Fatal("Error while calling cif.Get:", err) } // Check that all calls to cif.Get have returned and calls to bif.Get // didn't increase. - if got, want := atomic.LoadInt32(&retGetCount), int32(iter); got != want { + if got, want := retGetCount.Load(), int32(iter); got != want { t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) } - if got, want := atomic.LoadInt32(&bif.nCalls), int32(1); got != want { + if got, want := bif.nCalls.Load(), int32(1); got != want { t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) } } func TestDifferentGVRs(t *testing.T) { - bif := &BlockingInformerFactory{block: make(chan struct{})} + bif := &BlockingInformerFactory{ + block: make(chan struct{}), + } cif := &CachedInformerFactory{ Delegate: bif, } // counts the number of calls to cif.Get that returned - retGetCount := int32(0) + retGetCount := atomic.NewInt32(0) errGrp, ctx := errgroup.WithContext(context.Background()) @@ -125,13 +130,13 @@ func TestDifferentGVRs(t *testing.T) { // for another GVR. gvr := schema.GroupVersionResource{ Group: "testing.knative.dev", - Version: fmt.Sprintf("v%d", i), + Version: fmt.Sprint("v", i), Resource: "caches", } errGrp.Go(func() error { _, _, err := cif.Get(ctx, gvr) - atomic.AddInt32(&retGetCount, 1) + retGetCount.Inc() return err }) } @@ -141,10 +146,10 @@ func TestDifferentGVRs(t *testing.T) { // Check that no call to cif.Get have returned and bif.Get was called // once per iteration. - if got, want := atomic.LoadInt32(&retGetCount), int32(0); got != want { + if got, want := retGetCount.Load(), int32(0); got != want { t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) } - if got, want := atomic.LoadInt32(&bif.nCalls), int32(iter); got != want { + if got, want := bif.nCalls.Load(), int32(iter); got != want { t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) } @@ -152,15 +157,15 @@ func TestDifferentGVRs(t *testing.T) { close(bif.block) if err := errGrp.Wait(); err != nil { - t.Fatalf("Error while calling cif.Get: %v", err) + t.Fatal("Error while calling cif.Get:", err) } // Check that all calls to cif.Get have returned and the number of // calls to bif.Get didn't increase. - if got, want := atomic.LoadInt32(&retGetCount), int32(iter); got != want { + if got, want := retGetCount.Load(), int32(iter); got != want { t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) } - if got, want := atomic.LoadInt32(&bif.nCalls), int32(iter); got != want { + if got, want := bif.nCalls.Load(), int32(iter); got != want { t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) } } diff --git a/metrics/metrics.go b/metrics/metrics.go index cb69c0157..5b53057ca 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -19,12 +19,12 @@ package metrics import ( "context" "net/url" - "sync/atomic" "time" "go.opencensus.io/stats" "go.opencensus.io/stats/view" "go.opencensus.io/tag" + "go.uber.org/atomic" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/metrics" "k8s.io/client-go/util/workqueue" @@ -67,22 +67,20 @@ func (m counterMetric) Inc() { type gaugeMetric struct { mutators []tag.Mutator measure *stats.Int64Measure - total int64 + total atomic.Int64 } -var ( - _ workqueue.GaugeMetric = (*gaugeMetric)(nil) -) +var _ workqueue.GaugeMetric = (*gaugeMetric)(nil) // Inc implements CounterMetric func (m *gaugeMetric) Inc() { - total := atomic.AddInt64(&m.total, 1) + total := m.total.Inc() Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) } // Dec implements GaugeMetric func (m *gaugeMetric) Dec() { - total := atomic.AddInt64(&m.total, -1) + total := m.total.Dec() Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) } @@ -112,9 +110,7 @@ type latencyMetric struct { measure *stats.Float64Measure } -var ( - _ metrics.LatencyMetric = (*latencyMetric)(nil) -) +var _ metrics.LatencyMetric = (*latencyMetric)(nil) // Observe implements LatencyMetric func (m latencyMetric) Observe(verb string, u url.URL, t time.Duration) { diff --git a/metrics/workqueue.go b/metrics/workqueue.go index 80a29cdc4..5b1188d1f 100644 --- a/metrics/workqueue.go +++ b/metrics/workqueue.go @@ -35,9 +35,7 @@ type WorkqueueProvider struct { WorkDuration *stats.Float64Measure } -var ( - _ workqueue.MetricsProvider = (*WorkqueueProvider)(nil) -) +var _ workqueue.MetricsProvider = (*WorkqueueProvider)(nil) // NewAddsMetric implements MetricsProvider func (wp *WorkqueueProvider) NewAddsMetric(name string) workqueue.CounterMetric { diff --git a/reconciler/testing/hooks.go b/reconciler/testing/hooks.go index cde3d7d21..f60d6ebed 100644 --- a/reconciler/testing/hooks.go +++ b/reconciler/testing/hooks.go @@ -20,9 +20,9 @@ package testing import ( "errors" "sync" - "sync/atomic" "time" + "go.uber.org/atomic" "k8s.io/apimachinery/pkg/runtime" kubetesting "k8s.io/client-go/testing" ) @@ -80,7 +80,7 @@ that all hooks complete in a timely manner. */ type Hooks struct { completionCh chan int32 - completionIndex int32 + completionIndex *atomic.Int32 // Denotes whether or not the registered hooks should no longer be called // because they have already been waited upon. @@ -96,14 +96,14 @@ type Hooks struct { func NewHooks() *Hooks { return &Hooks{ completionCh: make(chan int32, 100), - completionIndex: -1, + completionIndex: atomic.NewInt32(-1), } } // OnCreate attaches a create hook to the given Fake. The hook function is // executed every time a resource of the given type is created. func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookFunc) { - index := atomic.AddInt32(&h.completionIndex, 1) + index := h.completionIndex.Inc() fake.PrependReactor("create", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { obj := a.(kubetesting.CreateActionImpl).Object @@ -119,7 +119,7 @@ func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookF // OnUpdate attaches an update hook to the given Fake. The hook function is // executed every time a resource of the given type is updated. func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookFunc) { - index := atomic.AddInt32(&h.completionIndex, 1) + index := h.completionIndex.Inc() fake.PrependReactor("update", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { obj := a.(kubetesting.UpdateActionImpl).Object @@ -135,7 +135,7 @@ func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookF // OnDelete attaches a delete hook to the given Fake. The hook function is // executed every time a resource of the given type is deleted. func (h *Hooks) OnDelete(fake *kubetesting.Fake, resource string, rf DeleteHookFunc) { - index := atomic.AddInt32(&h.completionIndex, 1) + index := h.completionIndex.Inc() fake.PrependReactor("delete", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { name := a.(kubetesting.DeleteActionImpl).Name @@ -159,7 +159,7 @@ func (h *Hooks) WaitForHooks(timeout time.Duration) error { h.closed = true }() - ci := int(atomic.LoadInt32(&h.completionIndex)) + ci := int(h.completionIndex.Load()) if ci == -1 { return nil } @@ -173,7 +173,7 @@ func (h *Hooks) WaitForHooks(timeout time.Duration) error { case i := <-h.completionCh: hookCompletions[i] = HookComplete if len(hookCompletions) == ci { - atomic.StoreInt32(&h.completionIndex, -1) + h.completionIndex.Dec() return nil } case <-timer: