stats: add DelayedPickComplete and follow correct semantics (#8465)

This commit is contained in:
Doug Fawley 2025-07-21 11:35:33 -07:00 committed by GitHub
parent 89d228107c
commit a5e7cd6d4c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
10 changed files with 70 additions and 64 deletions

View File

@ -208,7 +208,7 @@ func NewClient(target string, opts ...DialOption) (conn *ClientConn, err error)
channelz.Infof(logger, cc.channelz, "Channel authority set to %q", cc.authority)
cc.csMgr = newConnectivityStateManager(cc.ctx, cc.channelz)
cc.pickerWrapper = newPickerWrapper(cc.dopts.copts.StatsHandlers)
cc.pickerWrapper = newPickerWrapper()
cc.metricsRecorderList = stats.NewMetricsRecorderList(cc.dopts.copts.StatsHandlers)
@ -1076,13 +1076,6 @@ func (cc *ClientConn) healthCheckConfig() *healthCheckConfig {
return cc.sc.healthCheckConfig
}
func (cc *ClientConn) getTransport(ctx context.Context, failfast bool, method string) (transport.ClientTransport, balancer.PickResult, error) {
return cc.pickerWrapper.pick(ctx, failfast, balancer.PickInfo{
Ctx: ctx,
FullMethodName: method,
})
}
func (cc *ClientConn) applyServiceConfigAndBalancer(sc *ServiceConfig, configSelector iresolver.ConfigSelector) {
if sc == nil {
// should never reach here.

View File

@ -29,7 +29,6 @@ import (
"google.golang.org/grpc/internal/channelz"
istatus "google.golang.org/grpc/internal/status"
"google.golang.org/grpc/internal/transport"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
)
@ -48,14 +47,11 @@ type pickerGeneration struct {
// actions and unblock when there's a picker update.
type pickerWrapper struct {
// If pickerGen holds a nil pointer, the pickerWrapper is closed.
pickerGen atomic.Pointer[pickerGeneration]
statsHandlers []stats.Handler // to record blocking picker calls
pickerGen atomic.Pointer[pickerGeneration]
}
func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper {
pw := &pickerWrapper{
statsHandlers: statsHandlers,
}
func newPickerWrapper() *pickerWrapper {
pw := &pickerWrapper{}
pw.pickerGen.Store(&pickerGeneration{
blockingCh: make(chan struct{}),
})
@ -93,6 +89,12 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
}
}
type pick struct {
transport transport.ClientTransport // the selected transport
result balancer.PickResult // the contents of the pick from the LB policy
blocked bool // set if a picker call queued for a new picker
}
// pick returns the transport that will be used for the RPC.
// It may block in the following cases:
// - there's no picker
@ -100,15 +102,16 @@ func doneChannelzWrapper(acbw *acBalancerWrapper, result *balancer.PickResult) {
// - the current picker returns other errors and failfast is false.
// - the subConn returned by the current picker is not READY
// When one of these situations happens, pick blocks until the picker gets updated.
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (transport.ClientTransport, balancer.PickResult, error) {
func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.PickInfo) (pick, error) {
var ch chan struct{}
var lastPickErr error
pickBlocked := false
for {
pg := pw.pickerGen.Load()
if pg == nil {
return nil, balancer.PickResult{}, ErrClientConnClosing
return pick{}, ErrClientConnClosing
}
if pg.picker == nil {
ch = pg.blockingCh
@ -127,9 +130,9 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
}
switch ctx.Err() {
case context.DeadlineExceeded:
return nil, balancer.PickResult{}, status.Error(codes.DeadlineExceeded, errStr)
return pick{}, status.Error(codes.DeadlineExceeded, errStr)
case context.Canceled:
return nil, balancer.PickResult{}, status.Error(codes.Canceled, errStr)
return pick{}, status.Error(codes.Canceled, errStr)
}
case <-ch:
}
@ -145,9 +148,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
// In the second case, the only way it will get to this conditional is
// if there is a new picker.
if ch != nil {
for _, sh := range pw.statsHandlers {
sh.HandleRPC(ctx, &stats.PickerUpdated{})
}
pickBlocked = true
}
ch = pg.blockingCh
@ -164,7 +165,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if istatus.IsRestrictedControlPlaneCode(st) {
err = status.Errorf(codes.Internal, "received picker error with illegal status: %v", err)
}
return nil, balancer.PickResult{}, dropError{error: err}
return pick{}, dropError{error: err}
}
// For all other errors, wait for ready RPCs should block and other
// RPCs should fail with unavailable.
@ -172,7 +173,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
lastPickErr = err
continue
}
return nil, balancer.PickResult{}, status.Error(codes.Unavailable, err.Error())
return pick{}, status.Error(codes.Unavailable, err.Error())
}
acbw, ok := pickResult.SubConn.(*acBalancerWrapper)
@ -183,9 +184,8 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer.
if t := acbw.ac.getReadyTransport(); t != nil {
if channelz.IsOn() {
doneChannelzWrapper(acbw, &pickResult)
return t, pickResult, nil
}
return t, pickResult, nil
return pick{transport: t, result: pickResult, blocked: pickBlocked}, nil
}
if pickResult.Done != nil {
// Calling done with nil error, no bytes sent and no bytes received.

View File

@ -67,16 +67,16 @@ func (p *testingPicker) Pick(balancer.PickInfo) (balancer.PickResult, error) {
}
func (s) TestBlockingPickTimeout(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond)
defer cancel()
if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
if _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded {
t.Errorf("bp.pick returned error %v, want DeadlineExceeded", err)
}
}
func (s) TestBlockingPick(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
// All goroutines should block because picker is nil in bp.
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -85,8 +85,8 @@ func (s) TestBlockingPick(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
@ -102,7 +102,7 @@ func (s) TestBlockingPick(t *testing.T) {
}
func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
var finishedCount uint64
bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -112,8 +112,8 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
@ -129,7 +129,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) {
}
func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount})
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -140,8 +140,8 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, false, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()
@ -157,7 +157,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) {
}
func (s) TestBlockingPickSCNotReady(t *testing.T) {
bp := newPickerWrapper(nil)
bp := newPickerWrapper()
bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount})
var finishedCount uint64
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
@ -167,8 +167,8 @@ func (s) TestBlockingPickSCNotReady(t *testing.T) {
wg.Add(goroutineCount)
for i := goroutineCount; i > 0; i-- {
go func() {
if tr, _, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || tr != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", tr, err, testT)
if pick, err := bp.pick(ctx, true, balancer.PickInfo{}); err != nil || pick.transport != testT {
t.Errorf("bp.pick returned transport: %v, error: %v, want transport: %v, error: nil", pick.transport, err, testT)
}
atomic.AddUint64(&finishedCount, 1)
wg.Done()

View File

@ -179,6 +179,7 @@ NewSubConn is deprecated:
OverrideServerName is deprecated:
RemoveSubConn is deprecated:
SecurityVersion is deprecated:
stats.PickerUpdated is deprecated:
Target is deprecated: Use the Target field in the BuildOptions instead.
UpdateAddresses is deprecated:
UpdateSubConnState is deprecated:

View File

@ -52,7 +52,7 @@ func populateSpan(rs stats.RPCStats, ai *attemptInfo) {
)
// increment previous rpc attempts applicable for next attempt
atomic.AddUint32(&ai.previousRPCAttempts, 1)
case *stats.PickerUpdated:
case *stats.DelayedPickComplete:
span.AddEvent("Delayed LB pick complete")
case *stats.InPayload:
// message id - "must be calculated as two different counters starting

View File

@ -64,15 +64,21 @@ func (s *Begin) IsClient() bool { return s.Client }
func (s *Begin) isRPCStats() {}
// PickerUpdated indicates that the LB policy provided a new picker while the
// RPC was waiting for one.
type PickerUpdated struct{}
// DelayedPickComplete indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
type DelayedPickComplete struct{}
// IsClient indicates if the stats information is from client side. Only Client
// Side interfaces with a Picker, thus always returns true.
func (*PickerUpdated) IsClient() bool { return true }
// IsClient indicates DelayedPickComplete is available on the client.
func (*DelayedPickComplete) IsClient() bool { return true }
func (*PickerUpdated) isRPCStats() {}
func (*DelayedPickComplete) isRPCStats() {}
// PickerUpdated indicates that the RPC is unblocked following a delay in
// selecting a connection for the call.
//
// Deprecated: will be removed in a future release; use DelayedPickComplete
// instead.
type PickerUpdated = DelayedPickComplete
// InPayload contains stats about an incoming payload.
type InPayload struct {

View File

@ -469,8 +469,9 @@ func (cs *clientStream) newAttemptLocked(isTransparent bool) (*csAttempt, error)
func (a *csAttempt) getTransport() error {
cs := a.cs
var err error
a.transport, a.pickResult, err = cs.cc.getTransport(a.ctx, cs.callInfo.failFast, cs.callHdr.Method)
pickInfo := balancer.PickInfo{Ctx: a.ctx, FullMethodName: cs.callHdr.Method}
pick, err := cs.cc.pickerWrapper.pick(a.ctx, cs.callInfo.failFast, pickInfo)
a.transport, a.pickResult = pick.transport, pick.result
if err != nil {
if de, ok := err.(dropError); ok {
err = de.error
@ -481,6 +482,11 @@ func (a *csAttempt) getTransport() error {
if a.trInfo != nil {
a.trInfo.firstLine.SetRemoteAddr(a.transport.RemoteAddr())
}
if pick.blocked {
for _, sh := range a.statsHandlers {
sh.HandleRPC(a.ctx, &stats.DelayedPickComplete{})
}
}
return nil
}

View File

@ -6698,13 +6698,13 @@ func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) {
t.Fatalf("Unexpected error from UnaryCall: %v", err)
}
var pickerUpdatedCount uint
var delayedPickCompleteCount int
for _, stat := range sh.s {
if _, ok := stat.(*stats.PickerUpdated); ok {
pickerUpdatedCount++
if _, ok := stat.(*stats.DelayedPickComplete); ok {
delayedPickCompleteCount++
}
}
if pickerUpdatedCount != 1 {
t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2)
if got, want := delayedPickCompleteCount, 1; got != want {
t.Fatalf("sh.delayedPickComplete count: %v, want: %v", got, want)
}
}

View File

@ -578,7 +578,7 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte
}
func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) {
// these calls come in nondeterministically - so can just ignore
if _, ok := s.(*stats.PickerUpdated); ok {
if _, ok := s.(*stats.DelayedPickComplete); ok {
return
}
h.mu.Lock()

View File

@ -46,14 +46,14 @@ func (s) TestPeerForClientStatsHandler(t *testing.T) {
// * Begin stats lack peer info (RPC starts pre-resolution).
// * PickerUpdated: no peer info (picker lacks transport details).
expectedCallouts := map[stats.RPCStats]bool{
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.PickerUpdated{}: false,
&stats.OutPayload{}: true,
&stats.InHeader{}: true,
&stats.OutHeader{}: true,
&stats.InTrailer{}: true,
&stats.OutTrailer{}: true,
&stats.End{}: true,
&stats.Begin{}: false,
&stats.DelayedPickComplete{}: false,
}
// Start server.