test: use channelz instead of stats handler to determine RPC count (#5275)

This commit is contained in:
Easwar Swaminathan 2022-03-28 10:47:08 -07:00 committed by GitHub
parent e63e1230fd
commit 562e12f07b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 25 additions and 69 deletions

View File

@ -29,14 +29,13 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/grpcsync"
"google.golang.org/grpc/internal/channelz"
imetadata "google.golang.org/grpc/internal/metadata"
"google.golang.org/grpc/internal/stubserver"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/resolver/manual"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/status"
testgrpc "google.golang.org/grpc/test/grpc_testing"
testpb "google.golang.org/grpc/test/grpc_testing"
@ -44,47 +43,6 @@ import (
const rrServiceConfig = `{"loadBalancingConfig": [{"round_robin":{}}]}`
func statsHandlerDialOption(funcs statsHandlerFuncs) grpc.DialOption {
return grpc.WithStatsHandler(&statsHandler{funcs: funcs})
}
type statsHandlerFuncs struct {
TagRPC func(context.Context, *stats.RPCTagInfo) context.Context
HandleRPC func(context.Context, stats.RPCStats)
TagConn func(context.Context, *stats.ConnTagInfo) context.Context
HandleConn func(context.Context, stats.ConnStats)
}
type statsHandler struct {
funcs statsHandlerFuncs
}
func (s *statsHandler) TagRPC(ctx context.Context, info *stats.RPCTagInfo) context.Context {
if s.funcs.TagRPC != nil {
return s.funcs.TagRPC(ctx, info)
}
return ctx
}
func (s *statsHandler) HandleRPC(ctx context.Context, stats stats.RPCStats) {
if s.funcs.HandleRPC != nil {
s.funcs.HandleRPC(ctx, stats)
}
}
func (s *statsHandler) TagConn(ctx context.Context, info *stats.ConnTagInfo) context.Context {
if s.funcs.TagConn != nil {
return s.funcs.TagConn(ctx, info)
}
return ctx
}
func (s *statsHandler) HandleConn(ctx context.Context, stats stats.ConnStats) {
if s.funcs.HandleConn != nil {
s.funcs.HandleConn(ctx, stats)
}
}
func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, addrs []resolver.Address) error {
var peer peer.Peer
// Make sure connections to all backends are up.
@ -122,6 +80,11 @@ func checkRoundRobin(ctx context.Context, client testgrpc.TestServiceClient, add
func testRoundRobinBasic(ctx context.Context, t *testing.T, opts ...grpc.DialOption) (*grpc.ClientConn, *manual.Resolver, []*stubserver.StubServer) {
t.Helper()
// Initialize channelz. Used to determine pending RPC count.
czCleanup := channelz.NewChannelzStorageForTesting()
t.Cleanup(func() { czCleanupWrapper(czCleanup, t) })
r := manual.NewBuilderWithScheme("whatever")
const backendCount = 5
@ -210,29 +173,9 @@ func (s) TestRoundRobin_AddressesRemoved(t *testing.T) {
// blocked because there are no valid backends. This test verifies that when new
// backends are added, the RPC is able to complete.
func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {
// Register a stats handler which writes to `rpcCh` when an RPC is started.
// The stats handler starts writing to `rpcCh` only after `begin` has fired.
// We are not interested in being notified about initial RPCs which ensure
// that round_robin is working as expected. We are only interested in being
// notified when we have an RPC which is blocked because there are no
// backends, and will become unblocked when the resolver reports new backends.
begin := grpcsync.NewEvent()
rpcCh := make(chan struct{}, 1)
shOption := statsHandlerDialOption(statsHandlerFuncs{
HandleRPC: func(ctx context.Context, rpcStats stats.RPCStats) {
if !begin.HasFired() {
return
}
select {
case rpcCh <- struct{}{}:
default:
}
},
})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
cc, r, backends := testRoundRobinBasic(ctx, t, shOption)
cc, r, backends := testRoundRobinBasic(ctx, t)
// Send a resolver update with no addresses. This should push the channel into
// TransientFailure.
@ -243,7 +186,6 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {
}
}
begin.Fire()
client := testpb.NewTestServiceClient(cc)
doneCh := make(chan struct{})
go func() {
@ -256,11 +198,25 @@ func (s) TestRoundRobin_NewAddressWhileBlocking(t *testing.T) {
close(doneCh)
}()
select {
case <-ctx.Done():
t.Fatal("Timeout when waiting for RPC to start and block")
case <-rpcCh:
// Make sure that there is one pending RPC on the ClientConn before attempting
// to push new addresses through the name resolver. If we don't do this, the
// resolver update can happen before the above goroutine gets to make the RPC.
for {
if err := ctx.Err(); err != nil {
t.Fatal(err)
}
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
t.Fatalf("there should only be one top channel, not %d", len(tcs))
}
started := tcs[0].ChannelData.CallsStarted
completed := tcs[0].ChannelData.CallsSucceeded + tcs[0].ChannelData.CallsFailed
if (started - completed) == 1 {
break
}
time.Sleep(defaultTestShortTimeout)
}
// Send a resolver update with a valid backend to push the channel to Ready
// and unblock the above RPC.
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backends[0].Address}}})