Use go sync/atomic instead of go.uber.org/atomic (#2777)

* wip

* tiny fix

* Fix controller/controller_test.go

* fix metrics.go

* Fix profiling/server.go

* Fix reconciler/testing

* update ./test/spoof/spoof_test.go

* Fix ./test/zipkin/util.go

* update go.uber.org/atomic
This commit is contained in:
Kenjiro Nakayama 2023-08-03 01:26:48 +09:00 committed by GitHub
parent b3a65a0194
commit c11003ae6d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 34 additions and 31 deletions

View File

@ -19,10 +19,10 @@ package duck
import ( import (
"context" "context"
"fmt" "fmt"
"sync/atomic"
"testing" "testing"
"time" "time"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup" "golang.org/x/sync/errgroup"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -37,7 +37,7 @@ type BlockingInformerFactory struct {
var _ InformerFactory = (*BlockingInformerFactory)(nil) var _ InformerFactory = (*BlockingInformerFactory)(nil)
func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, cache.GenericLister, error) { func (bif *BlockingInformerFactory) Get(ctx context.Context, gvr schema.GroupVersionResource) (cache.SharedIndexInformer, cache.GenericLister, error) {
bif.nCalls.Inc() bif.nCalls.Add(1)
// Wait here until we can acquire the lock // Wait here until we can acquire the lock
<-bif.block <-bif.block
@ -58,7 +58,7 @@ func TestSameGVR(t *testing.T) {
} }
// counts the number of calls to cif.Get that returned // counts the number of calls to cif.Get that returned
retGetCount := atomic.NewInt32(0) var retGetCount atomic.Int32
errGrp, ctx := errgroup.WithContext(context.Background()) errGrp, ctx := errgroup.WithContext(context.Background())
@ -75,7 +75,7 @@ func TestSameGVR(t *testing.T) {
for i := 0; i < iter; i++ { for i := 0; i < iter; i++ {
errGrp.Go(func() error { errGrp.Go(func() error {
_, _, err := cif.Get(ctx, gvr) _, _, err := cif.Get(ctx, gvr)
retGetCount.Inc() retGetCount.Add(1)
return err return err
}) })
} }
@ -119,7 +119,7 @@ func TestDifferentGVRs(t *testing.T) {
} }
// counts the number of calls to cif.Get that returned // counts the number of calls to cif.Get that returned
retGetCount := atomic.NewInt32(0) var retGetCount atomic.Int32
errGrp, ctx := errgroup.WithContext(context.Background()) errGrp, ctx := errgroup.WithContext(context.Background())
@ -136,7 +136,7 @@ func TestDifferentGVRs(t *testing.T) {
errGrp.Go(func() error { errGrp.Go(func() error {
_, _, err := cif.Get(ctx, gvr) _, _, err := cif.Get(ctx, gvr)
retGetCount.Inc() retGetCount.Add(1)
return err return err
}) })
} }

View File

@ -21,11 +21,11 @@ import (
"errors" "errors"
"fmt" "fmt"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
"github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp"
"go.uber.org/atomic"
coordinationv1 "k8s.io/api/coordination/v1" coordinationv1 "k8s.io/api/coordination/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -874,7 +874,7 @@ type CountingReconciler struct {
} }
func (cr *CountingReconciler) Reconcile(context.Context, string) error { func (cr *CountingReconciler) Reconcile(context.Context, string) error {
cr.count.Inc() cr.count.Add(1)
return nil return nil
} }
@ -927,7 +927,7 @@ type countingLeaderAwareReconciler struct {
var _ reconciler.LeaderAware = (*countingLeaderAwareReconciler)(nil) var _ reconciler.LeaderAware = (*countingLeaderAwareReconciler)(nil)
func (cr *countingLeaderAwareReconciler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error { func (cr *countingLeaderAwareReconciler) Promote(b reconciler.Bucket, enq func(reconciler.Bucket, types.NamespacedName)) error {
cr.promotionCount.Inc() cr.promotionCount.Add(1)
return cr.LeaderAwareFuncs.Promote(b, enq) return cr.LeaderAwareFuncs.Promote(b, enq)
} }
@ -941,7 +941,7 @@ func (cr *countingLeaderAwareReconciler) Reconcile(ctx context.Context, key stri
Namespace: namespace, Namespace: namespace,
Name: name, Name: name,
}) { }) {
cr.reconcileCount.Inc() cr.reconcileCount.Add(1)
} }
return nil return nil
} }

2
go.mod
View File

@ -29,7 +29,6 @@ require (
github.com/spf13/pflag v1.0.5 github.com/spf13/pflag v1.0.5
github.com/tsenart/vegeta/v12 v12.11.0 github.com/tsenart/vegeta/v12 v12.11.0
go.opencensus.io v0.24.0 go.opencensus.io v0.24.0
go.uber.org/atomic v1.9.0
go.uber.org/automaxprocs v1.4.0 go.uber.org/automaxprocs v1.4.0
go.uber.org/zap v1.24.0 go.uber.org/zap v1.24.0
golang.org/x/net v0.12.0 golang.org/x/net v0.12.0
@ -91,6 +90,7 @@ require (
github.com/prometheus/procfs v0.10.1 // indirect github.com/prometheus/procfs v0.10.1 // indirect
github.com/prometheus/statsd_exporter v0.21.0 // indirect github.com/prometheus/statsd_exporter v0.21.0 // indirect
github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect github.com/rs/dnscache v0.0.0-20211102005908-e0241e321417 // indirect
go.uber.org/atomic v1.9.0 // indirect
go.uber.org/goleak v1.2.0 // indirect go.uber.org/goleak v1.2.0 // indirect
go.uber.org/multierr v1.6.0 // indirect go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.11.0 // indirect golang.org/x/crypto v0.11.0 // indirect

View File

@ -19,12 +19,12 @@ package metrics
import ( import (
"context" "context"
"net/url" "net/url"
"sync/atomic"
"time" "time"
"go.opencensus.io/stats" "go.opencensus.io/stats"
"go.opencensus.io/stats/view" "go.opencensus.io/stats/view"
"go.opencensus.io/tag" "go.opencensus.io/tag"
"go.uber.org/atomic"
"k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/metrics" "k8s.io/client-go/tools/metrics"
"k8s.io/client-go/util/workqueue" "k8s.io/client-go/util/workqueue"
@ -74,13 +74,13 @@ var _ workqueue.GaugeMetric = (*gaugeMetric)(nil)
// Inc implements CounterMetric // Inc implements CounterMetric
func (m *gaugeMetric) Inc() { func (m *gaugeMetric) Inc() {
total := m.total.Inc() total := m.total.Add(1)
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
} }
// Dec implements GaugeMetric // Dec implements GaugeMetric
func (m *gaugeMetric) Dec() { func (m *gaugeMetric) Dec() {
total := m.total.Dec() total := m.total.Add(-1)
Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...)) Record(context.Background(), m.measure.M(total), stats.WithTags(m.mutators...))
} }

View File

@ -22,9 +22,9 @@ import (
"net/http/pprof" "net/http/pprof"
"os" "os"
"strconv" "strconv"
"sync/atomic"
"time" "time"
"go.uber.org/atomic"
"go.uber.org/zap" "go.uber.org/zap"
corev1 "k8s.io/api/core/v1" corev1 "k8s.io/api/core/v1"
) )
@ -63,8 +63,11 @@ func NewHandler(logger *zap.SugaredLogger, enableProfiling bool) *Handler {
mux.HandleFunc(pprofPrefix+"trace", pprof.Trace) mux.HandleFunc(pprofPrefix+"trace", pprof.Trace)
logger.Info("Profiling enabled: ", enableProfiling) logger.Info("Profiling enabled: ", enableProfiling)
var enabled atomic.Bool
enabled.Store(enableProfiling)
return &Handler{ return &Handler{
enabled: atomic.NewBool(enableProfiling), enabled: &enabled,
handler: mux, handler: mux,
log: logger, log: logger,
} }

View File

@ -18,11 +18,10 @@ package testing
import ( import (
"context" "context"
"sync/atomic"
"testing" "testing"
"time" "time"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/runtime/schema"
@ -94,7 +93,7 @@ func RunAndSyncInformers(ctx context.Context, informers ...controller.Informer)
c.PrependReactor("list", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) { c.PrependReactor("list", "*", func(action clientgotesting.Action) (handled bool, ret runtime.Object, err error) {
// Every list (before actual informer usage) is going to be followed by a Watch call. // Every list (before actual informer usage) is going to be followed by a Watch call.
watchesPending.Inc() watchesPending.Add(1)
return false, nil, nil return false, nil, nil
}) })
@ -108,7 +107,7 @@ func RunAndSyncInformers(ctx context.Context, informers ...controller.Informer)
return false, nil, err return false, nil, err
} }
watchesPending.Dec() watchesPending.Add(-1)
return true, watch, nil return true, watch, nil
}) })

View File

@ -20,9 +20,9 @@ package testing
import ( import (
"errors" "errors"
"sync" "sync"
"sync/atomic"
"time" "time"
"go.uber.org/atomic"
"k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/runtime"
kubetesting "k8s.io/client-go/testing" kubetesting "k8s.io/client-go/testing"
) )
@ -94,16 +94,18 @@ type Hooks struct {
// more fake clients and wait for all hooks to complete. // more fake clients and wait for all hooks to complete.
// TODO(grantr): Allow validating that a hook never fires // TODO(grantr): Allow validating that a hook never fires
func NewHooks() *Hooks { func NewHooks() *Hooks {
var ci atomic.Int32
ci.Store(-1)
return &Hooks{ return &Hooks{
completionCh: make(chan int32, 100), completionCh: make(chan int32, 100),
completionIndex: atomic.NewInt32(-1), completionIndex: &ci,
} }
} }
// OnCreate attaches a create hook to the given Fake. The hook function is // OnCreate attaches a create hook to the given Fake. The hook function is
// executed every time a resource of the given type is created. // executed every time a resource of the given type is created.
func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookFunc) { func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookFunc) {
index := h.completionIndex.Inc() index := h.completionIndex.Add(1)
fake.PrependReactor("create", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("create", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
obj := a.(kubetesting.CreateActionImpl).Object obj := a.(kubetesting.CreateActionImpl).Object
@ -119,7 +121,7 @@ func (h *Hooks) OnCreate(fake *kubetesting.Fake, resource string, rf CreateHookF
// OnUpdate attaches an update hook to the given Fake. The hook function is // OnUpdate attaches an update hook to the given Fake. The hook function is
// executed every time a resource of the given type is updated. // executed every time a resource of the given type is updated.
func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookFunc) { func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookFunc) {
index := h.completionIndex.Inc() index := h.completionIndex.Add(1)
fake.PrependReactor("update", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("update", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
obj := a.(kubetesting.UpdateActionImpl).Object obj := a.(kubetesting.UpdateActionImpl).Object
@ -135,7 +137,7 @@ func (h *Hooks) OnUpdate(fake *kubetesting.Fake, resource string, rf UpdateHookF
// OnDelete attaches a delete hook to the given Fake. The hook function is // OnDelete attaches a delete hook to the given Fake. The hook function is
// executed every time a resource of the given type is deleted. // executed every time a resource of the given type is deleted.
func (h *Hooks) OnDelete(fake *kubetesting.Fake, resource string, rf DeleteHookFunc) { func (h *Hooks) OnDelete(fake *kubetesting.Fake, resource string, rf DeleteHookFunc) {
index := h.completionIndex.Inc() index := h.completionIndex.Add(1)
fake.PrependReactor("delete", resource, func(a kubetesting.Action) (bool, runtime.Object, error) { fake.PrependReactor("delete", resource, func(a kubetesting.Action) (bool, runtime.Object, error) {
name := a.(kubetesting.DeleteActionImpl).Name name := a.(kubetesting.DeleteActionImpl).Name
@ -173,7 +175,7 @@ func (h *Hooks) WaitForHooks(timeout time.Duration) error {
case i := <-h.completionCh: case i := <-h.completionCh:
hookCompletions[i] = HookComplete hookCompletions[i] = HookComplete
if len(hookCompletions) == ci { if len(hookCompletions) == ci {
h.completionIndex.Dec() h.completionIndex.Add(-1)
return nil return nil
} }
case <-timer: case <-timer:

View File

@ -24,11 +24,10 @@ import (
"fmt" "fmt"
"net/http" "net/http"
"net/url" "net/url"
"sync/atomic"
"time" "time"
"testing" "testing"
"go.uber.org/atomic"
) )
var ( var (
@ -49,7 +48,7 @@ type fakeTransport struct {
} }
func (ft *fakeTransport) RoundTrip(req *http.Request) (*http.Response, error) { func (ft *fakeTransport) RoundTrip(req *http.Request) (*http.Response, error) {
call := ft.calls.Inc() call := ft.calls.Add(1)
if ft.response != nil && call == 2 { if ft.response != nil && call == 2 {
// If both a response and an error is defined, we return just the response on // If both a response and an error is defined, we return just the response on
// the second call to simulate a retry that passes eventually. // the second call to simulate a retry that passes eventually.
@ -159,7 +158,7 @@ func TestSpoofingClient_WaitForEndpointState(t *testing.T) {
inState: func() ResponseChecker { inState: func() ResponseChecker {
var calls atomic.Int32 var calls atomic.Int32
return func(resp *Response) (done bool, err error) { return func(resp *Response) (done bool, err error) {
val := calls.Inc() val := calls.Add(1)
// Stop the looping on the third invocation // Stop the looping on the third invocation
return val == 3, nil return val == 3, nil
} }

View File

@ -28,10 +28,10 @@ import (
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
"sync/atomic"
"testing" "testing"
"time" "time"
"go.uber.org/atomic"
tracingconfig "knative.dev/pkg/tracing/config" tracingconfig "knative.dev/pkg/tracing/config"
"github.com/openzipkin/zipkin-go/model" "github.com/openzipkin/zipkin-go/model"