Some more replacements to atomic.* types (#1747)

* Some more replacements to atomic.* types

* fix
This commit is contained in:
Victor Agababov 2020-09-25 10:54:44 -07:00 committed by GitHub
parent 7b99ff5a0b
commit d86a08b1d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 40 additions and 41 deletions

View File

@ -19,24 +19,25 @@ package duck
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
"testing" "testing"
"time" "time"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
) )
type BlockingInformerFactory struct { type BlockingInformerFactory struct {
block chan struct{} block chan struct{}
nCalls int32 nCalls atomic.Int32
} }
var _ InformerFactory = (*BlockingInformerFactory)(nil) var _ InformerFactory = (*BlockingInformerFactory)(nil)
func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, cache.GenericLister, error) { func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, cache.GenericLister, error) {
atomic.AddInt32(&bif.nCalls, 1) bif.nCalls.Inc()
// Wait here until we can acquire the lock // Wait here until we can acquire the lock
<-bif.block <-bif.block
@ -48,14 +49,16 @@ func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVer
} }
func TestSameGVR(t *testing.T) { func TestSameGVR(t *testing.T) {
bif := &BlockingInformerFactory{block: make(chan struct{})} bif := &BlockingInformerFactory{
block: make(chan struct{}),
}
cif := &CachedInformerFactory{ cif := &CachedInformerFactory{
Delegate: bif, Delegate: bif,
} }
// counts the number of calls to cif.Get that returned // counts the number of calls to cif.Get that returned
retGetCount := int32(0) retGetCount := atomic.NewInt32(0)
errGrp, ctx := errgroup.WithContext(context.Background()) errGrp, ctx := errgroup.WithContext(context.Background())
@ -72,7 +75,7 @@ func TestSameGVR(t *testing.T) {
for i := 0; i < iter; i++ { for i := 0; i < iter; i++ {
errGrp.Go(func() error { errGrp.Go(func() error {
_, _, err := cif.Get(ctx, gvr) _, _, err := cif.Get(ctx, gvr)
atomic.AddInt32(&retGetCount, 1) retGetCount.Inc()
return err return err
}) })
} }
@ -82,10 +85,10 @@ func TestSameGVR(t *testing.T) {
// Check that no call to cif.Get have returned and bif.Get was called // Check that no call to cif.Get have returned and bif.Get was called
// only once. // only once.
if got, want := atomic.LoadInt32(&retGetCount), int32(0); got != want { if got, want := retGetCount.Load(), int32(0); got != want {
t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want)
} }
if got, want := atomic.LoadInt32(&bif.nCalls), int32(1); got != want { if got, want := bif.nCalls.Load(), int32(1); got != want {
t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want)
} }
@ -93,28 +96,30 @@ func TestSameGVR(t *testing.T) {
close(bif.block) close(bif.block)
if err := errGrp.Wait(); err != nil { if err := errGrp.Wait(); err != nil {
t.Fatalf("Error while calling cif.Get: %v", err) t.Fatal("Error while calling cif.Get:", err)
} }
// Check that all calls to cif.Get have returned and calls to bif.Get // Check that all calls to cif.Get have returned and calls to bif.Get
// didn't increase. // didn't increase.
if got, want := atomic.LoadInt32(&retGetCount), int32(iter); got != want { if got, want := retGetCount.Load(), int32(iter); got != want {
t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want)
} }
if got, want := atomic.LoadInt32(&bif.nCalls), int32(1); got != want { if got, want := bif.nCalls.Load(), int32(1); got != want {
t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want)
} }
} }
func TestDifferentGVRs(t *testing.T) { func TestDifferentGVRs(t *testing.T) {
bif := &BlockingInformerFactory{block: make(chan struct{})} bif := &BlockingInformerFactory{
block: make(chan struct{}),
}
cif := &CachedInformerFactory{ cif := &CachedInformerFactory{
Delegate: bif, Delegate: bif,
} }
// counts the number of calls to cif.Get that returned // counts the number of calls to cif.Get that returned
retGetCount := int32(0) retGetCount := atomic.NewInt32(0)
errGrp, ctx := errgroup.WithContext(context.Background()) errGrp, ctx := errgroup.WithContext(context.Background())
@ -125,13 +130,13 @@ func TestDifferentGVRs(t *testing.T) {
// for another GVR. // for another GVR.
gvr := schema.GroupVersionResource{ gvr := schema.GroupVersionResource{
Group: "testing.knative.dev", Group: "testing.knative.dev",
Version: fmt.Sprintf("v%d", i), Version: fmt.Sprint("v", i),
Resource: "caches", Resource: "caches",
} }
errGrp.Go(func() error { errGrp.Go(func() error {
_, _, err := cif.Get(ctx, gvr) _, _, err := cif.Get(ctx, gvr)
atomic.AddInt32(&retGetCount, 1) retGetCount.Inc()
return err return err
}) })
} }
@ -141,10 +146,10 @@ func TestDifferentGVRs(t *testing.T) {
// Check that no call to cif.Get have returned and bif.Get was called // Check that no call to cif.Get have returned and bif.Get was called
// once per iteration. // once per iteration.
if got, want := atomic.LoadInt32(&retGetCount), int32(0); got != want { if got, want := retGetCount.Load(), int32(0); got != want {
t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want)
} }
if got, want := atomic.LoadInt32(&bif.nCalls), int32(iter); got != want { if got, want := bif.nCalls.Load(), int32(iter); got != want {
t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want)
} }
@ -152,15 +157,15 @@ func TestDifferentGVRs(t *testing.T) {
close(bif.block) close(bif.block)
if err := errGrp.Wait(); err != nil { if err := errGrp.Wait(); err != nil {
t.Fatalf("Error while calling cif.Get: %v", err) t.Fatal("Error while calling cif.Get:", err)
} }
// Check that all calls to cif.Get have returned and the number of // Check that all calls to cif.Get have returned and the number of
// calls to bif.Get didn't increase. // calls to bif.Get didn't increase.
if got, want := atomic.LoadInt32(&retGetCount), int32(iter); got != want { if got, want := retGetCount.Load(), int32(iter); got != want {
t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want) t.Errorf("Got %d returned call(s) to cif.Get, wanted %d", got, want)
} }
if got, want := atomic.LoadInt32(&bif.nCalls), int32(iter); got != want { if got, want := bif.nCalls.Load(), int32(iter); got != want {
t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want) t.Errorf("Got %d call(s) to bif.Get, wanted %d", got, want)
} }
} }

View File

@ -19,12 +19,12 @@ package metrics
import ( import (
"context" "context"
"net/url" "net/url"
"sync/atomic"
"time" "time"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"go.uber.org/atomic"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/metrics" "k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -67,22 +67,20 @@ func (m counterMetric) Inc() {
type gaugeMetric struct { type gaugeMetric struct {
mutators []tag.Mutator mutators []tag.Mutator
measure *stats.Int64Measure measure *stats.Int64Measure
total int64 total atomic.Int64
} }
var ( var _ workqueue.GaugeMetric = (*gaugeMetric)(nil)
_ workqueue.GaugeMetric = (*gaugeMetric)(nil)
)
// Inc implements CounterMetric // Inc implements CounterMetric
func (m *gaugeMetric) Inc() { func (m *gaugeMetric) Inc() {
total := atomic.AddInt64(&m.total, 1) total := m.total.Inc()
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
} }
// Dec implements GaugeMetric // Dec implements GaugeMetric
func (m *gaugeMetric) Dec() { func (m *gaugeMetric) Dec() {
total := atomic.AddInt64(&m.total, -1) total := m.total.Dec()
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
} }
@ -112,9 +110,7 @@ type latencyMetric struct {
measure *stats.Float64Measure measure *stats.Float64Measure
} }
var ( var _ metrics.LatencyMetric = (*latencyMetric)(nil)
_ metrics.LatencyMetric = (*latencyMetric)(nil)
)
// Observe implements LatencyMetric // Observe implements LatencyMetric
func (m latencyMetric) Observe(verb string, u url.URL, t time.Duration) { func (m latencyMetric) Observe(verb string, u url.URL, t time.Duration) {

View File

@ -35,9 +35,7 @@ type WorkqueueProvider struct {
WorkDuration *stats.Float64Measure WorkDuration *stats.Float64Measure
} }
var ( var _ workqueue.MetricsProvider = (*WorkqueueProvider)(nil)
_ workqueue.MetricsProvider = (*WorkqueueProvider)(nil)
)
// NewAddsMetric implements MetricsProvider // NewAddsMetric implements MetricsProvider
func (wp *WorkqueueProvider) NewAddsMetric(name string) workqueue.CounterMetric { func (wp *WorkqueueProvider) NewAddsMetric(name string) workqueue.CounterMetric {

View File

@ -20,9 +20,9 @@ package testing
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
kubetesting "k8s.io/client-go/testing" kubetesting "k8s.io/client-go/testing"
) )
@ -80,7 +80,7 @@ that all hooks complete in a timely manner.
*/ */
type Hooks struct { type Hooks struct {
completionCh chan int32 completionCh chan int32
completionIndex int32 completionIndex *atomic.Int32
// Denotes whether or not the registered hooks should no longer be called // Denotes whether or not the registered hooks should no longer be called
// because they have already been waited upon. // because they have already been waited upon.
@ -96,14 +96,14 @@ type Hooks struct {
func NewHooks() *Hooks { func NewHooks() *Hooks {
return &Hooks{ return &Hooks{
completionCh: make(chan int32, 100), completionCh: make(chan int32, 100),
completionIndex: -1, completionIndex: atomic.NewInt32(-1),
} }
} }
// OnCreate attaches a create hook to the given Fake. The hook function is // OnCreate attaches a create hook to the given Fake. The hook function is
// executed every time a resource of the given type is created. // executed every time a resource of the given type is created.
func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookFunc) { func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookFunc) {
index := atomic.AddInt32(&h.completionIndex, 1) index := h.completionIndex.Inc()
fake.PrependReactor("create", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("create", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
obj := a.(kubetesting.CreateActionImpl).Object obj := a.(kubetesting.CreateActionImpl).Object
@ -119,7 +119,7 @@ func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookF
// OnUpdate attaches an update hook to the given Fake. The hook function is // OnUpdate attaches an update hook to the given Fake. The hook function is
// executed every time a resource of the given type is updated. // executed every time a resource of the given type is updated.
func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookFunc) { func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookFunc) {
index := atomic.AddInt32(&h.completionIndex, 1) index := h.completionIndex.Inc()
fake.PrependReactor("update", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("update", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
obj := a.(kubetesting.UpdateActionImpl).Object obj := a.(kubetesting.UpdateActionImpl).Object
@ -135,7 +135,7 @@ func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookF
// OnDelete attaches a delete hook to the given Fake. The hook function is // OnDelete attaches a delete hook to the given Fake. The hook function is
// executed every time a resource of the given type is deleted. // executed every time a resource of the given type is deleted.
func (h *Hooks) OnDelete(fake *kubetesting.Fake, resource string, rf DeleteHookFunc) { func (h *Hooks) OnDelete(fake *kubetesting.Fake, resource string, rf DeleteHookFunc) {
index := atomic.AddInt32(&h.completionIndex, 1) index := h.completionIndex.Inc()
fake.PrependReactor("delete", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("delete", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
name := a.(kubetesting.DeleteActionImpl).Name name := a.(kubetesting.DeleteActionImpl).Name
@ -159,7 +159,7 @@ func (h *Hooks) WaitForHooks(timeout time.Duration) error {
h.closed = true h.closed = true
}() }()
ci := int(atomic.LoadInt32(&h.completionIndex)) ci := int(h.completionIndex.Load())
if ci == -1 { if ci == -1 {
return nil return nil
} }
@ -173,7 +173,7 @@ func (h *Hooks) WaitForHooks(timeout time.Duration) error {
case i := <-h.completionCh: case i := <-h.completionCh:
hookCompletions[i] = HookComplete hookCompletions[i] = HookComplete
if len(hookCompletions) == ci { if len(hookCompletions) == ci {
atomic.StoreInt32(&h.completionIndex, -1) h.completionIndex.Dec()
return nil return nil
} }
case <-timer: case <-timer: