diff --git a/clientconn.go b/clientconn.go index ef6c55aca..6007df1c0 100644 --- a/clientconn.go +++ b/clientconn.go @@ -348,7 +348,7 @@ func (cc *ClientConn) exitIdleMode() error { cc.idlenessState = ccIdlenessStateExitingIdle exitedIdle := false if cc.blockingpicker == nil { - cc.blockingpicker = newPickerWrapper() + cc.blockingpicker = newPickerWrapper(cc.dopts.copts.StatsHandlers) } else { cc.blockingpicker.exitIdleMode() exitedIdle = true diff --git a/picker_wrapper.go b/picker_wrapper.go index 02f975951..236837f41 100644 --- a/picker_wrapper.go +++ b/picker_wrapper.go @@ -28,21 +28,26 @@ import ( "google.golang.org/grpc/internal/channelz" istatus "google.golang.org/grpc/internal/status" "google.golang.org/grpc/internal/transport" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" ) // pickerWrapper is a wrapper of balancer.Picker. It blocks on certain pick // actions and unblock when there's a picker update. type pickerWrapper struct { - mu sync.Mutex - done bool - idle bool - blockingCh chan struct{} - picker balancer.Picker + mu sync.Mutex + done bool + idle bool + blockingCh chan struct{} + picker balancer.Picker + statsHandlers []stats.Handler // to record blocking picker calls } -func newPickerWrapper() *pickerWrapper { - return &pickerWrapper{blockingCh: make(chan struct{})} +func newPickerWrapper(statsHandlers []stats.Handler) *pickerWrapper { + return &pickerWrapper{ + blockingCh: make(chan struct{}), + statsHandlers: statsHandlers, + } } // updatePicker is called by UpdateBalancerState. It unblocks all blocked pick. @@ -95,6 +100,7 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. var ch chan struct{} var lastPickErr error + for { pw.mu.Lock() if pw.done { @@ -129,6 +135,20 @@ func (pw *pickerWrapper) pick(ctx context.Context, failfast bool, info balancer. continue } + // If the channel is set, it means that the pick call had to wait for a + // new picker at some point. Either it's the first iteration and this + // function received the first picker, or a picker errored with + // ErrNoSubConnAvailable or errored with failfast set to false, which + // will trigger a continue to the next iteration. In the first case this + // conditional will hit if this call had to block (the channel is set). + // In the second case, the only way it will get to this conditional is + // if there is a new picker. + if ch != nil { + for _, sh := range pw.statsHandlers { + sh.HandleRPC(ctx, &stats.PickerUpdated{}) + } + } + ch = pw.blockingCh p := pw.picker pw.mu.Unlock() diff --git a/picker_wrapper_test.go b/picker_wrapper_test.go index a4fae85d3..ba99a06b0 100644 --- a/picker_wrapper_test.go +++ b/picker_wrapper_test.go @@ -66,7 +66,7 @@ func (p *testingPicker) Pick(info balancer.PickInfo) (balancer.PickResult, error } func (s) TestBlockingPickTimeout(t *testing.T) { - bp := newPickerWrapper() + bp := newPickerWrapper(nil) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond) defer cancel() if _, _, err := bp.pick(ctx, true, balancer.PickInfo{}); status.Code(err) != codes.DeadlineExceeded { @@ -75,7 +75,7 @@ func (s) TestBlockingPickTimeout(t *testing.T) { } func (s) TestBlockingPick(t *testing.T) { - bp := newPickerWrapper() + bp := newPickerWrapper(nil) // All goroutines should block because picker is nil in bp. var finishedCount uint64 for i := goroutineCount; i > 0; i-- { @@ -94,7 +94,7 @@ func (s) TestBlockingPick(t *testing.T) { } func (s) TestBlockingPickNoSubAvailable(t *testing.T) { - bp := newPickerWrapper() + bp := newPickerWrapper(nil) var finishedCount uint64 bp.updatePicker(&testingPicker{err: balancer.ErrNoSubConnAvailable, maxCalled: goroutineCount}) // All goroutines should block because picker returns no subConn available. @@ -114,7 +114,7 @@ func (s) TestBlockingPickNoSubAvailable(t *testing.T) { } func (s) TestBlockingPickTransientWaitforready(t *testing.T) { - bp := newPickerWrapper() + bp := newPickerWrapper(nil) bp.updatePicker(&testingPicker{err: balancer.ErrTransientFailure, maxCalled: goroutineCount}) var finishedCount uint64 // All goroutines should block because picker returns transientFailure and @@ -135,7 +135,7 @@ func (s) TestBlockingPickTransientWaitforready(t *testing.T) { } func (s) TestBlockingPickSCNotReady(t *testing.T) { - bp := newPickerWrapper() + bp := newPickerWrapper(nil) bp.updatePicker(&testingPicker{sc: testSCNotReady, maxCalled: goroutineCount}) var finishedCount uint64 // All goroutines should block because subConn is not ready. diff --git a/stats/opencensus/trace.go b/stats/opencensus/trace.go index f41cb838a..5a5438e1c 100644 --- a/stats/opencensus/trace.go +++ b/stats/opencensus/trace.go @@ -99,9 +99,11 @@ func populateSpan(ctx context.Context, rs stats.RPCStats, ti *traceInfo) { trace.BoolAttribute("Client", rs.Client), trace.BoolAttribute("FailFast", rs.FailFast), ) + case *stats.PickerUpdated: + span.Annotate(nil, "Delayed LB pick complete") case *stats.InPayload: // message id - "must be calculated as two different counters starting - // from 1 one for sent messages and one for received messages." + // from one for sent messages and one for received messages." mi := atomic.AddUint32(&ti.countRecvMsg, 1) span.AddMessageReceiveEvent(int64(mi), int64(rs.Length), int64(rs.CompressedLength)) case *stats.OutPayload: diff --git a/stats/stats.go b/stats/stats.go index 7a552a9b7..f23b6b338 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -59,6 +59,16 @@ func (s *Begin) IsClient() bool { return s.Client } func (s *Begin) isRPCStats() {} +// PickerUpdated indicates that the LB policy provided a new picker while the +// RPC was waiting for one. +type PickerUpdated struct{} + +// IsClient indicates if the stats information is from client side. Only Client +// Side interfaces with a Picker, thus always returns true. +func (*PickerUpdated) IsClient() bool { return true } + +func (*PickerUpdated) isRPCStats() {} + // InPayload contains the information for an incoming payload. type InPayload struct { // Client is true if this InPayload is from client side. diff --git a/test/end2end_test.go b/test/end2end_test.go index 865285b35..b3604d03b 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -23,6 +23,7 @@ import ( "bytes" "context" "crypto/tls" + "encoding/json" "errors" "flag" "fmt" @@ -44,6 +45,8 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/roundrobin" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -62,6 +65,7 @@ import ( "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" + "google.golang.org/grpc/serviceconfig" "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/tap" @@ -82,6 +86,7 @@ const defaultHealthService = "grpc.health.v1.Health" func init() { channelz.TurnOn() + balancer.Register(triggerRPCBlockPickerBalancerBuilder{}) } type s struct { @@ -6362,3 +6367,165 @@ func (s) TestGlobalBinaryLoggingOptions(t *testing.T) { t.Fatalf("want 8 server side binary logging events, got %v", ssbl.mml.events) } } + +type statsHandlerRecordEvents struct { + mu sync.Mutex + s []stats.RPCStats +} + +func (*statsHandlerRecordEvents) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) context.Context { + return ctx +} +func (h *statsHandlerRecordEvents) HandleRPC(_ context.Context, s stats.RPCStats) { + h.mu.Lock() + defer h.mu.Unlock() + h.s = append(h.s, s) +} +func (*statsHandlerRecordEvents) TagConn(ctx context.Context, _ *stats.ConnTagInfo) context.Context { + return ctx +} +func (*statsHandlerRecordEvents) HandleConn(context.Context, stats.ConnStats) {} + +type triggerRPCBlockPicker struct { + pickDone func() +} + +func (bp *triggerRPCBlockPicker) Pick(pi balancer.PickInfo) (balancer.PickResult, error) { + bp.pickDone() + return balancer.PickResult{}, balancer.ErrNoSubConnAvailable +} + +const name = "triggerRPCBlockBalancer" + +type triggerRPCBlockPickerBalancerBuilder struct{} + +func (triggerRPCBlockPickerBalancerBuilder) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &triggerRPCBlockBalancer{ + blockingPickerDone: grpcsync.NewEvent(), + ClientConn: cc, + } + // round_robin child to complete balancer tree with a usable leaf policy and + // have RPCs actually work. + builder := balancer.Get(roundrobin.Name) + rr := builder.Build(b, bOpts) + if rr == nil { + panic("round robin builder returned nil") + } + b.Balancer = rr + return b +} + +func (triggerRPCBlockPickerBalancerBuilder) ParseConfig(json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return &bpbConfig{}, nil +} + +func (triggerRPCBlockPickerBalancerBuilder) Name() string { + return name +} + +type bpbConfig struct { + serviceconfig.LoadBalancingConfig +} + +// triggerRPCBlockBalancer uses a child RR balancer, but blocks all UpdateState +// calls until the first Pick call. That first Pick returns +// ErrNoSubConnAvailable to make the RPC block and trigger the appropriate stats +// handler callout. After the first Pick call, it will forward at least one +// READY picker update from the child, causing RPCs to proceed as normal using a +// round robin balancer's picker if it updates with a READY picker. +type triggerRPCBlockBalancer struct { + stateMu sync.Mutex + childState balancer.State + + blockingPickerDone *grpcsync.Event + // embed a ClientConn to wrap only UpdateState() operation + balancer.ClientConn + // embed a Balancer to wrap only UpdateClientConnState() operation + balancer.Balancer +} + +func (bpb *triggerRPCBlockBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + err := bpb.Balancer.UpdateClientConnState(s) + bpb.ClientConn.UpdateState(balancer.State{ + ConnectivityState: connectivity.Connecting, + Picker: &triggerRPCBlockPicker{ + pickDone: func() { + bpb.stateMu.Lock() + defer bpb.stateMu.Unlock() + bpb.blockingPickerDone.Fire() + if bpb.childState.ConnectivityState == connectivity.Ready { + bpb.ClientConn.UpdateState(bpb.childState) + } + }, + }, + }) + return err +} + +func (bpb *triggerRPCBlockBalancer) UpdateState(state balancer.State) { + bpb.stateMu.Lock() + defer bpb.stateMu.Unlock() + bpb.childState = state + if bpb.blockingPickerDone.HasFired() { // guard first one to get a picker sending ErrNoSubConnAvailable first + if state.ConnectivityState == connectivity.Ready { + bpb.ClientConn.UpdateState(state) // after the first rr picker update, only forward once READY for deterministic picker counts + } + } +} + +// TestRPCBlockingOnPickerStatsCall tests the emission of a stats handler call +// that represents the RPC had to block waiting for a new picker due to +// ErrNoSubConnAvailable being returned from the first picker call. +func (s) TestRPCBlockingOnPickerStatsCall(t *testing.T) { + sh := &statsHandlerRecordEvents{} + ss := &stubserver.StubServer{ + UnaryCallF: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { + return &testpb.SimpleResponse{}, nil + }, + } + + if err := ss.StartServer(); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer ss.Stop() + + lbCfgJSON := `{ + "loadBalancingConfig": [ + { + "triggerRPCBlockBalancer": {} + } + ] + }` + + sc := internal.ParseServiceConfig.(func(string) *serviceconfig.ParseResult)(lbCfgJSON) + mr := manual.NewBuilderWithScheme("pickerupdatedbalancer") + defer mr.Close() + mr.InitialState(resolver.State{ + Addresses: []resolver.Address{ + {Addr: ss.Address}, + }, + ServiceConfig: sc, + }) + + cc, err := grpc.Dial(mr.Scheme()+":///", grpc.WithResolvers(mr), grpc.WithStatsHandler(sh), grpc.WithTransportCredentials(insecure.NewCredentials())) + if err != nil { + t.Fatalf("grpc.Dial() failed: %v", err) + } + defer cc.Close() + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + testServiceClient := testgrpc.NewTestServiceClient(cc) + if _, err := testServiceClient.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil { + t.Fatalf("Unexpected error from UnaryCall: %v", err) + } + + var pickerUpdatedCount uint + for _, stat := range sh.s { + if _, ok := stat.(*stats.PickerUpdated); ok { + pickerUpdatedCount++ + } + } + if pickerUpdatedCount != 1 { + t.Fatalf("sh.pickerUpdated count: %v, want: %v", pickerUpdatedCount, 2) + } +} diff --git a/test/retry_test.go b/test/retry_test.go index 06e2479ff..de274cbc3 100644 --- a/test/retry_test.go +++ b/test/retry_test.go @@ -474,6 +474,10 @@ func (*retryStatsHandler) TagRPC(ctx context.Context, _ *stats.RPCTagInfo) conte return ctx } func (h *retryStatsHandler) HandleRPC(_ context.Context, s stats.RPCStats) { + // these calls come in nondeterministically - so can just ignore + if _, ok := s.(*stats.PickerUpdated); ok { + return + } h.mu.Lock() h.s = append(h.s, s) h.mu.Unlock()