From 562e12f07b7faa6a330bf48db1b0036b06cecca5 Mon Sep 17 00:00:00 2001 From: Easwar Swaminathan Date: Mon, 28 Mar 2022 10:47:08 -0700 Subject: [PATCH] test: use channelz instead of stats handler to determine RPC count (#5275) --- test/roundrobin_test.go | 94 +++++++++++------------------------------ 1 file changed, 25 insertions(+), 69 deletions(-) diff --git a/test/roundrobin_test.go b/test/roundrobin_test.go index 557a47f77..a7b94197f 100644 --- a/test/roundrobin_test.go +++ b/test/roundrobin_test.go @@ -29,14 +29,13 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials/insecure" - "google.golang.org/grpc/internal/grpcsync" + "google.golang.org/grpc/internal/channelz" imetadata "google.golang.org/grpc/internal/metadata" "google.golang.org/grpc/internal/stubserver" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver/manual" - "google.golang.org/grpc/stats" "google.golang.org/grpc/status" testgrpc "google.golang.org/grpc/test/grpc_testing" testpb "google.golang.org/grpc/test/grpc_testing" @@ -44,47 +43,6 @@ import ( const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}` -func statsHandlerDialOption(funcs statsHandlerFuncs) grpc.DialOption { - return grpc.WithStatsHandler(&statsHandler{funcs: funcs}) -} - -type statsHandlerFuncs struct { - TagRPC func(context.Context, *stats.RPCTagInfo) context.Context - HandleRPC func(context.Context, stats.RPCStats) - TagConn func(context.Context, *stats.ConnTagInfo) context.Context - HandleConn func(context.Context, stats.ConnStats) -} - -type statsHandler struct { - funcs statsHandlerFuncs -} - -func (s *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context { - if s.funcs.TagRPC != nil { - return s.funcs.TagRPC(ctx, info) - } - return ctx -} - -func (s *statsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) { - if s.funcs.HandleRPC != nil { - s.funcs.HandleRPC(ctx, stats) - } -} - -func (s *statsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context { - if s.funcs.TagConn != nil { - return s.funcs.TagConn(ctx, info) - } - return ctx -} - -func (s *statsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) { - if s.funcs.HandleConn != nil { - s.funcs.HandleConn(ctx, stats) - } -} - func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error { var peer peer.Peer // Make sure connections to all backends are up. @@ -122,6 +80,11 @@ func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, add func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) { t.Helper() + + // Initialize channelz. Used to determine pending RPC count. + czCleanup := channelz.NewChannelzStorageForTesting() + t.Cleanup(func() { czCleanupWrapper(czCleanup, t) }) + r := manual.NewBuilderWithScheme("whatever") const backendCount = 5 @@ -210,29 +173,9 @@ func (s) TestRoundRobin_AddressesRemoved(t *testing.T) { // blocked because there are no valid backends. This test verifies that when new // backends are added, the RPC is able to complete. func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) { - // Register a stats handler which writes to `rpcCh` when an RPC is started. - // The stats handler starts writing to `rpcCh` only after `begin` has fired. - // We are not interested in being notified about initial RPCs which ensure - // that round_robin is working as expected. We are only interested in being - // notified when we have an RPC which is blocked because there are no - // backends, and will become unblocked when the resolver reports new backends. - begin := grpcsync.NewEvent() - rpcCh := make(chan struct{}, 1) - shOption := statsHandlerDialOption(statsHandlerFuncs{ - HandleRPC: func(ctx context.Context, rpcStats stats.RPCStats) { - if !begin.HasFired() { - return - } - select { - case rpcCh <- struct{}{}: - default: - } - }, - }) - ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) defer cancel() - cc, r, backends := testRoundRobinBasic(ctx, t, shOption) + cc, r, backends := testRoundRobinBasic(ctx, t) // Send a resolver update with no addresses. This should push the channel into // TransientFailure. @@ -243,7 +186,6 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) { } } - begin.Fire() client := testpb.NewTestServiceClient(cc) doneCh := make(chan struct{}) go func() { @@ -256,11 +198,25 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) { close(doneCh) }() - select { - case <-ctx.Done(): - t.Fatal("Timeout when waiting for RPC to start and block") - case <-rpcCh: + // Make sure that there is one pending RPC on the ClientConn before attempting + // to push new addresses through the name resolver. If we don't do this, the + // resolver update can happen before the above goroutine gets to make the RPC. + for { + if err := ctx.Err(); err != nil { + t.Fatal(err) + } + tcs, _ := channelz.GetTopChannels(0, 0) + if len(tcs) != 1 { + t.Fatalf("there should only be one top channel, not %d", len(tcs)) + } + started := tcs[0].ChannelData.CallsStarted + completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed + if (started - completed) == 1 { + break + } + time.Sleep(defaultTestShortTimeout) } + // Send a resolver update with a valid backend to push the channel to Ready // and unblock the above RPC. r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})