balancer: Move metrics recorder from BuildOptions to ClientConn (#8027)

This commit is contained in:
Arjan Singh Bal 2025-02-06 17:33:09 +05:30 committed by GitHub
parent 3e27c175ff
commit f227ba9ba0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
12 changed files with 34 additions and 26 deletions

View File

@ -175,6 +175,12 @@ type ClientConn interface {
// Deprecated: Use the Target field in the BuildOptions instead.
Target() string
// MetricsRecorder provides the metrics recorder that balancers can use to
// record metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this method. The returned
// MetricsRecorder is guaranteed to never be nil.
MetricsRecorder() estats.MetricsRecorder
// EnforceClientConnEmbedding is included to force implementers to embed
// another implementation of this interface, allowing gRPC to add methods
// without breaking users.
@ -210,10 +216,6 @@ type BuildOptions struct {
// same resolver.Target as passed to the resolver. See the documentation for
// the resolver.Target type for details about what it contains.
Target resolver.Target
// MetricsRecorder is the metrics recorder that balancers can use to record
// metrics. Balancer implementations which do not register metrics on
// metrics registry and record on them can ignore this field.
MetricsRecorder estats.MetricsRecorder
}
// Builder creates a balancer.

View File

@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)
@ -56,7 +55,7 @@ func (s) TestPickFirst_InitialResolverError(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
bal.ResolverError(errors.New("resolution failed: test error"))
@ -89,7 +88,7 @@ func (s) TestPickFirst_ResolverErrorinTF(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
// After sending a valid update, the LB policy should report CONNECTING.

View File

@ -120,7 +120,7 @@ func (pickfirstBuilder) Build(cc balancer.ClientConn, bo balancer.BuildOptions)
b := &pickfirstBalancer{
cc: cc,
target: bo.Target.String(),
metricsRecorder: bo.MetricsRecorder, // ClientConn will always create a Metrics Recorder.
metricsRecorder: cc.MetricsRecorder(),
subConns: resolver.NewAddressMap(),
state: connectivity.Connecting,

View File

@ -1099,7 +1099,7 @@ func (s) TestPickFirstLeaf_InterleavingIPV4Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
@ -1145,7 +1145,7 @@ func (s) TestPickFirstLeaf_InterleavingIPv6Preffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{
@ -1189,7 +1189,7 @@ func (s) TestPickFirstLeaf_InterleavingUnknownPreffered(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := balancer.Get(pickfirstleaf.Name).Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{

View File

@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)
@ -196,7 +195,7 @@ func (s) TestPickFirstLeaf_TFPickerUpdate(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc := testutils.NewBalancerClientConn(t)
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{MetricsRecorder: &stats.NoopMetricsRecorder{}})
bal := pickfirstBuilder{}.Build(cc, balancer.BuildOptions{})
defer bal.Close()
ccState := balancer.ClientConnState{
ResolverState: resolver.State{

View File

@ -140,7 +140,7 @@ func (rlsBB) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.
updateCh: buffer.NewUnbounded(),
}
lb.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-experimental-lb %p] ", lb))
lb.dataCache = newDataCache(maxCacheSize, lb.logger, opts.MetricsRecorder, opts.Target.String())
lb.dataCache = newDataCache(maxCacheSize, lb.logger, cc.MetricsRecorder(), opts.Target.String())
lb.bg = balancergroup.New(balancergroup.Options{
CC: cc,
BuildOpts: opts,
@ -539,7 +539,7 @@ func (b *rlsBalancer) sendNewPickerLocked() {
bg: b.bg,
rlsServerTarget: b.lbCfg.lookupService,
grpcTarget: b.bopts.Target.String(),
metricsRecorder: b.bopts.MetricsRecorder,
metricsRecorder: b.cc.MetricsRecorder(),
}
picker.logger = internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf("[rls-picker %p] ", picker))
state := balancer.State{

View File

@ -94,7 +94,7 @@ func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Ba
b := &wrrBalancer{
ClientConn: cc,
target: bOpts.Target.String(),
metricsRecorder: bOpts.MetricsRecorder,
metricsRecorder: cc.MetricsRecorder(),
addressWeights: resolver.NewAddressMap(),
endpointToWeight: resolver.NewEndpointMap(),
scToWeight: make(map[balancer.SubConn]*endpointWeight),

View File

@ -26,6 +26,7 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/balancer/gracefulswitch"
"google.golang.org/grpc/internal/channelz"
@ -93,7 +94,6 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
CustomUserAgent: cc.dopts.copts.UserAgent,
ChannelzParent: cc.channelz,
Target: cc.parsedTarget,
MetricsRecorder: cc.metricsRecorderList,
},
serializer: grpcsync.NewCallbackSerializer(ctx),
serializerCancel: cancel,
@ -102,6 +102,10 @@ func newCCBalancerWrapper(cc *ClientConn) *ccBalancerWrapper {
return ccb
}
func (ccb *ccBalancerWrapper) MetricsRecorder() stats.MetricsRecorder {
return ccb.cc.metricsRecorderList
}
// updateClientConnState is invoked by grpc to push a ClientConnState update to
// the underlying balancer. This is always executed from the serializer, so
// it is safe to call into the balancer here.

View File

@ -33,7 +33,6 @@ import (
"google.golang.org/grpc/internal/channelz"
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
)
@ -604,7 +603,6 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.BuildOptions.MetricsRecorder = &stats.NoopMetricsRecorder{}
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {

View File

@ -110,11 +110,11 @@ func (recordingLoadBalancerBuilder) Name() string {
}
func (recordingLoadBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
intCountHandle.Record(bOpts.MetricsRecorder, 1, "int counter label val", "int counter optional label val")
floatCountHandle.Record(bOpts.MetricsRecorder, 2, "float counter label val", "float counter optional label val")
intHistoHandle.Record(bOpts.MetricsRecorder, 3, "int histo label val", "int histo optional label val")
floatHistoHandle.Record(bOpts.MetricsRecorder, 4, "float histo label val", "float histo optional label val")
intGaugeHandle.Record(bOpts.MetricsRecorder, 5, "int gauge label val", "int gauge optional label val")
intCountHandle.Record(cc.MetricsRecorder(), 1, "int counter label val", "int counter optional label val")
floatCountHandle.Record(cc.MetricsRecorder(), 2, "float counter label val", "float counter optional label val")
intHistoHandle.Record(cc.MetricsRecorder(), 3, "int histo label val", "int histo optional label val")
floatHistoHandle.Record(cc.MetricsRecorder(), 4, "float histo label val", "float histo optional label val")
intGaugeHandle.Record(cc.MetricsRecorder(), 5, "int gauge label val", "int gauge optional label val")
return &recordingLoadBalancer{
Balancer: balancer.Get(pickfirst.Name).Build(cc, bOpts),

View File

@ -26,9 +26,12 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/resolver"
istats "google.golang.org/grpc/internal/stats"
)
// TestSubConn implements the SubConn interface, to be used in tests.
@ -155,6 +158,11 @@ func (tcc *BalancerClientConn) NewSubConn(a []resolver.Address, o balancer.NewSu
return sc, nil
}
// MetricsRecorder returns an empty MetricsRecorderList.
func (*BalancerClientConn) MetricsRecorder() stats.MetricsRecorder {
return istats.NewMetricsRecorderList(nil)
}
// RemoveSubConn is a nop; tests should all be updated to use sc.Shutdown()
// instead.
func (tcc *BalancerClientConn) RemoveSubConn(sc balancer.SubConn) {

View File

@ -34,7 +34,6 @@ import (
"google.golang.org/grpc/internal/grpctest"
"google.golang.org/grpc/internal/hierarchy"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/internal/testutils/stats"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/status"
)
@ -644,7 +643,6 @@ func TestClusterGracefulSwitch(t *testing.T) {
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(bd *stub.BalancerData) {
bd.BuildOptions.MetricsRecorder = &stats.NoopMetricsRecorder{}
bd.Data = balancer.Get(pickfirst.Name).Build(bd.ClientConn, bd.BuildOptions)
},
Close: func(bd *stub.BalancerData) {