test: remove funcServer and some uses of NewTestServiceService (#3867)

This commit is contained in:
Doug Fawley 2020-09-09 13:23:46 -07:00 committed by GitHub
parent 8630cac324
commit 15157e2664
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 180 additions and 189 deletions

View File

@ -150,7 +150,7 @@ func (s) TestCredsBundleFromBalancer(t *testing.T) {
te.customServerOptions = []grpc.ServerOption{
grpc.Creds(creds),
}
te.startServer(&testServer{})
te.startServer((&testServer{}).Svc())
defer te.tearDown()
cc := te.clientConn()
@ -179,7 +179,7 @@ func testPickExtraMetadata(t *testing.T, e env) {
grpc.WithBalancerName(testBalancerName),
grpc.WithUserAgent(testUserAgent),
}
te.startServer(&testServer{security: e.security})
te.startServer((&testServer{security: e.security}).Svc())
defer te.tearDown()
// Set resolver to xds to trigger the extra metadata code path.
@ -228,7 +228,7 @@ func testDoneInfo(t *testing.T, e env) {
grpc.WithBalancerName(testBalancerName),
}
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
te.startServer((&testServer{security: e.security}).Svc())
defer te.tearDown()
cc := te.clientConn()
@ -498,7 +498,7 @@ func (s) TestAddressAttributesInNewSubConn(t *testing.T) {
}
s := grpc.NewServer()
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(&testServer{}))
testpb.RegisterTestServiceService(s, testServer{}.Svc())
go s.Serve(lis)
defer s.Stop()
t.Logf("Started gRPC server at %s...", lis.Addr().String())
@ -556,12 +556,12 @@ func (s) TestServersSwap(t *testing.T) {
t.Fatalf("Error while listening. Err: %v", err)
}
s := grpc.NewServer()
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{
UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: username}, nil
},
}
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s, ts)
go s.Serve(lis)
return lis.Addr().String(), s.Stop
}
@ -616,12 +616,12 @@ func (s) TestEmptyAddrs(t *testing.T) {
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{
UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s, ts)
go s.Serve(lis)
// Initialize pickfirst client
@ -705,12 +705,12 @@ func (s) TestWaitForReady(t *testing.T) {
s := grpc.NewServer()
defer s.Stop()
const one = "1"
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{
UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{Username: one}, nil
},
}
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s, ts)
go s.Serve(lis)
// Initialize client

View File

@ -43,7 +43,7 @@ func testCZSocketMetricsSocketOption(t *testing.T, e env) {
czCleanup := channelz.NewChannelzStorage()
defer czCleanupWrapper(czCleanup, t)
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer((&testServer{security: e.security}).Svc())
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)

View File

@ -85,7 +85,7 @@ func (s) TestCZServerRegistrationAndDeletion(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServers(&testServer{security: e.security}, c.total)
te.startServers(testServer{security: e.security}.Svc(), c.total)
ss, end := channelz.GetServers(c.start, c.max)
if int64(len(ss)) != c.length || end != c.end {
@ -104,7 +104,7 @@ func (s) TestCZGetServer(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
ss, _ := channelz.GetServers(0, 0)
@ -253,7 +253,7 @@ func (s) TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
num := 3 // number of backends
te := newTest(t, e)
var svrAddrs []resolver.Address
te.startServers(&testServer{security: e.security}, num)
te.startServers(testServer{security: e.security}.Svc(), num)
r := manual.NewBuilderWithScheme("whatever")
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
@ -339,7 +339,7 @@ func (s) TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
var ccs []*grpc.ClientConn
for i := 0; i < c.total; i++ {
cc := te.clientConn()
@ -504,7 +504,7 @@ func (s) TestCZChannelMetrics(t *testing.T) {
te := newTest(t, e)
te.maxClientSendMsgSize = newInt(8)
var svrAddrs []resolver.Address
te.startServers(&testServer{security: e.security}, num)
te.startServers(testServer{security: e.security}.Svc(), num)
r := manual.NewBuilderWithScheme("whatever")
for _, a := range te.srvAddrs {
svrAddrs = append(svrAddrs, resolver.Address{Addr: a})
@ -590,7 +590,7 @@ func (s) TestCZServerMetrics(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
te.maxServerReceiveMsgSize = newInt(8)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
@ -861,7 +861,7 @@ func (s) TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
te := newTest(t, e)
te.maxServerReceiveMsgSize = newInt(20)
te.maxClientReceiveMsgSize = newInt(20)
rcw := te.startServerWithConnControl(&testServer{security: e.security})
rcw := te.startServerWithConnControl(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
@ -963,7 +963,7 @@ func (s) TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *t
// Avoid overflowing connection level flow control window, which will lead to
// transport being closed.
te.serverInitialConnWindowSize = 65536 * 2
ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
ts := &testpb.TestServiceService{FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
stream.Send(&testpb.StreamingOutputCallResponse{})
<-stream.Context().Done()
return status.Errorf(codes.DeadlineExceeded, "deadline exceeded or cancelled")
@ -1048,7 +1048,7 @@ func (s) TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
te.serverInitialConnWindowSize = 65536
te.clientInitialWindowSize = 65536
te.clientInitialConnWindowSize = 65536
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
@ -1169,7 +1169,7 @@ func (s) TestCZClientSocketMetricsKeepAlive(t *testing.T) {
MinTime: 500 * time.Millisecond,
PermitWithoutStream: true,
}))
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
te.clientConn() // Dial the server
defer te.tearDown()
if err := verifyResultWithDelay(func() (bool, error) {
@ -1211,7 +1211,7 @@ func (s) TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
te := newTest(t, e)
te.maxServerReceiveMsgSize = newInt(20)
te.maxClientReceiveMsgSize = newInt(20)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc, _ := te.clientConnWithConnControl()
tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
@ -1282,7 +1282,7 @@ func (s) TestCZServerSocketMetricsKeepAlive(t *testing.T) {
Timeout: 100 * time.Millisecond,
})
te.customServerOptions = append(te.customServerOptions, kpOption)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
@ -1342,7 +1342,7 @@ func (s) TestCZSocketGetSecurityValueTLS(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpTLSRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
te.clientConn()
if err := verifyResultWithDelay(func() (bool, error) {
@ -1467,7 +1467,7 @@ func (s) TestCZSubChannelTraceCreationDeletion(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
@ -1560,7 +1560,7 @@ func (s) TestCZChannelAddressResolutionChange(t *testing.T) {
e := tcpClearRREnv
e.balancer = ""
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
addrs := []resolver.Address{{Addr: te.srvAddr}}
r.InitialState(resolver.State{Addresses: addrs})
@ -1663,7 +1663,7 @@ func (s) TestCZSubChannelPickedNewAddress(t *testing.T) {
e := tcpClearRREnv
e.balancer = ""
te := newTest(t, e)
te.startServers(&testServer{security: e.security}, 3)
te.startServers(testServer{security: e.security}.Svc(), 3)
r := manual.NewBuilderWithScheme("whatever")
var svrAddrs []resolver.Address
for _, a := range te.srvAddrs {
@ -1722,7 +1722,7 @@ func (s) TestCZSubChannelConnectivityState(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
@ -1816,7 +1816,7 @@ func (s) TestCZChannelConnectivityState(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
@ -1939,7 +1939,7 @@ func (s) TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
te := newTest(t, e)
channelz.SetMaxTraceEntry(1)
defer channelz.ResetMaxTraceEntryToDefault()
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()
@ -1997,7 +1997,7 @@ func (s) TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
defer czCleanupWrapper(czCleanup, t)
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
r := manual.NewBuilderWithScheme("whatever")
r.InitialState(resolver.State{Addresses: []resolver.Address{{Addr: te.srvAddr}}})
te.resolverScheme = r.Scheme()

View File

@ -87,7 +87,7 @@ func (s) TestCredsBundleBoth(t *testing.T) {
te.customServerOptions = []grpc.ServerOption{
grpc.Creds(creds),
}
te.startServer(&testServer{})
te.startServer(testServer{}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -109,7 +109,7 @@ func (s) TestCredsBundleTransportCredentials(t *testing.T) {
te.customServerOptions = []grpc.ServerOption{
grpc.Creds(creds),
}
te.startServer(&testServer{})
te.startServer(testServer{}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -125,7 +125,7 @@ func (s) TestCredsBundlePerRPCCredentials(t *testing.T) {
te.customDialOptions = []grpc.DialOption{
grpc.WithCredentialsBundle(&testCredsBundle{t: t, mode: bundlePerRPCOnly}),
}
te.startServer(&testServer{})
te.startServer(testServer{}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -159,7 +159,7 @@ func (c *clientTimeoutCreds) Clone() credentials.TransportCredentials {
func (s) TestNonFailFastRPCSucceedOnTimeoutCreds(t *testing.T) {
te := newTest(t, env{name: "timeout-cred", network: "tcp", security: "empty"})
te.userAgent = testAppUA
te.startServer(&testServer{security: te.e.security})
te.startServer(testServer{security: te.e.security}.Svc())
defer te.tearDown()
cc := te.clientConn(grpc.WithTransportCredentials(&clientTimeoutCreds{}))
@ -183,7 +183,7 @@ func (s) TestGRPCMethodAccessibleToCredsViaContextRequestInfo(t *testing.T) {
const wantMethod = "/grpc.testing.TestService/EmptyCall"
te := newTest(t, env{name: "context-request-info", network: "tcp"})
te.userAgent = testAppUA
te.startServer(&testServer{security: te.e.security})
te.startServer(testServer{security: te.e.security}.Svc())
defer te.tearDown()
cc := te.clientConn(grpc.WithPerRPCCredentials(&methodTestCreds{}))
@ -218,7 +218,7 @@ func (c clientAlwaysFailCred) Clone() credentials.TransportCredentials {
func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "empty", balancer: "round_robin"})
te.startServer(&testServer{security: te.e.security})
te.startServer(testServer{security: te.e.security}.Svc())
defer te.tearDown()
opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})}
@ -246,7 +246,7 @@ func (s) TestFailFastRPCErrorOnBadCertificates(t *testing.T) {
func (s) TestWaitForReadyRPCErrorOnBadCertificates(t *testing.T) {
te := newTest(t, env{name: "bad-cred", network: "tcp", security: "empty", balancer: "round_robin"})
te.startServer(&testServer{security: te.e.security})
te.startServer(testServer{security: te.e.security}.Svc())
defer te.tearDown()
opts := []grpc.DialOption{grpc.WithTransportCredentials(clientAlwaysFailCred{})}
@ -312,7 +312,7 @@ func testPerRPCCredentialsViaDialOptions(t *testing.T, e env) {
te := newTest(t, e)
te.tapHandle = authHandle
te.perRPCCreds = testPerRPCCredentials{}
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -331,7 +331,7 @@ func (s) TestPerRPCCredentialsViaCallOptions(t *testing.T) {
func testPerRPCCredentialsViaCallOptions(t *testing.T, e env) {
te := newTest(t, e)
te.tapHandle = authHandle
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -371,7 +371,7 @@ func testPerRPCCredentialsViaDialOptionsAndCallOptions(t *testing.T, e env) {
}
return ctx, nil
}
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()

View File

@ -136,6 +136,20 @@ type testServer struct {
var _ testpb.UnstableTestServiceService = (*testServer)(nil)
// Svc returns a registerable TestService for this testServer instances.
// Because `s` is passed by value for convenience, any subsequent changes to
// `s` are not recognized.
func (s testServer) Svc() *testpb.TestServiceService {
return &testpb.TestServiceService{
EmptyCall: s.EmptyCall,
UnaryCall: s.UnaryCall,
StreamingOutputCall: s.StreamingOutputCall,
StreamingInputCall: s.StreamingInputCall,
FullDuplexCall: s.FullDuplexCall,
HalfDuplexCall: s.HalfDuplexCall,
}
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
if md, ok := metadata.FromIncomingContext(ctx); ok {
// For testing purpose, returns an error if user-agent is failAppUA.
@ -566,7 +580,7 @@ func newTest(t *testing.T, e env) *test {
return te
}
func (te *test) listenAndServe(ts interface{}, listen func(network, address string) (net.Listener, error)) net.Listener {
func (te *test) listenAndServe(ts *testpb.TestServiceService, listen func(network, address string) (net.Listener, error)) net.Listener {
te.t.Helper()
te.t.Logf("Running test in %s environment...", te.e.name)
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(te.maxStream)}
@ -626,7 +640,7 @@ func (te *test) listenAndServe(ts interface{}, listen func(network, address stri
sopts = append(sopts, te.customServerOptions...)
s := grpc.NewServer(sopts...)
if ts != nil {
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s, ts)
}
// Create a new default health server if enableHealthServer is set, or use
@ -691,20 +705,20 @@ func (w wrapHS) Stop() {
w.s.Close()
}
func (te *test) startServerWithConnControl(ts interface{}) *listenerWrapper {
func (te *test) startServerWithConnControl(ts *testpb.TestServiceService) *listenerWrapper {
l := te.listenAndServe(ts, listenWithConnControl)
return l.(*listenerWrapper)
}
// startServer starts a gRPC server exposing the provided TestService
// implementation. Callers should defer a call to te.tearDown to clean up
func (te *test) startServer(ts interface{}) {
func (te *test) startServer(ts *testpb.TestServiceService) {
te.t.Helper()
te.listenAndServe(ts, net.Listen)
}
// startServers starts 'num' gRPC servers exposing the provided TestService.
func (te *test) startServers(ts interface{}, num int) {
func (te *test) startServers(ts *testpb.TestServiceService, num int) {
for i := 0; i < num; i++ {
te.startServer(ts)
te.srvs = append(te.srvs, te.srv.(*grpc.Server))
@ -912,7 +926,7 @@ func (s) TestContextDeadlineNotIgnored(t *testing.T) {
}
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -946,7 +960,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -988,7 +1002,7 @@ func (s) TestServerGracefulStopIdempotent(t *testing.T) {
func testServerGracefulStopIdempotent(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
for i := 0; i < 3; i++ {
@ -1008,7 +1022,7 @@ func (s) TestServerGoAway(t *testing.T) {
func testServerGoAway(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1060,7 +1074,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1134,7 +1148,7 @@ func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1221,7 +1235,7 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1258,7 +1272,7 @@ func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1333,7 +1347,7 @@ func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
@ -1372,7 +1386,7 @@ func testFailFast(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -1731,7 +1745,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te1.resolverScheme = r.Scheme()
te1.nonBlockingDial = true
te1.startServer(&testServer{security: e.security})
te1.startServer(testServer{security: e.security}.Svc())
cc1 := te1.clientConn(grpc.WithResolvers(r))
addrs := []resolver.Address{{Addr: te1.srvAddr}}
@ -1821,7 +1835,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te2.maxClientReceiveMsgSize = newInt(1024)
te2.maxClientSendMsgSize = newInt(1024)
te2.startServer(&testServer{security: e.security})
te2.startServer(testServer{security: e.security}.Svc())
defer te2.tearDown()
cc2 := te2.clientConn(grpc.WithResolvers(r))
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: te2.srvAddr}}, ServiceConfig: sc})
@ -1881,7 +1895,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
te3.maxClientReceiveMsgSize = newInt(4096)
te3.maxClientSendMsgSize = newInt(4096)
te3.startServer(&testServer{security: e.security})
te3.startServer(testServer{security: e.security}.Svc())
defer te3.tearDown()
cc3 := te3.clientConn(grpc.WithResolvers(r))
@ -1965,7 +1979,7 @@ func (s) TestServiceConfigMaxMsgSize(t *testing.T) {
// test makes sure read from streaming RPC doesn't fail in this case.
func (s) TestStreamingRPCWithTimeoutInServiceConfigRecv(t *testing.T) {
te := testServiceConfigSetup(t, tcpClearRREnv)
te.startServer(&testServer{security: tcpClearRREnv.security})
te.startServer(testServer{security: tcpClearRREnv.security}.Svc())
defer te.tearDown()
r := manual.NewBuilderWithScheme("whatever")
@ -2047,7 +2061,7 @@ func testPreloaderClientSend(t *testing.T, e env) {
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2121,7 +2135,7 @@ func testMaxMsgSizeClientDefault(t *testing.T, e env) {
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2185,7 +2199,7 @@ func testMaxMsgSizeClientAPI(t *testing.T, e env) {
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2270,7 +2284,7 @@ func testMaxMsgSizeServerAPI(t *testing.T, e env) {
"grpc: addrConn.resetTransport failed to create client transport: connection error",
"Failed to dial : context canceled; please retry.",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2375,7 +2389,7 @@ func testTap(t *testing.T, e env) {
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
"grpc: addrConn.resetTransport failed to create client transport: connection error",
)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -2470,7 +2484,7 @@ func (s) TestHealthCheckSuccess(t *testing.T) {
func testHealthCheckSuccess(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
defer te.tearDown()
@ -2492,7 +2506,7 @@ func testHealthCheckFailure(t *testing.T, e env) {
"grpc: the client connection is closing; please retry",
)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
defer te.tearDown()
@ -2516,7 +2530,7 @@ func (s) TestHealthCheckOff(t *testing.T) {
func testHealthCheckOff(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), defaultHealthService, codes.NotFound)
@ -2533,7 +2547,7 @@ func (s) TestHealthWatchMultipleClients(t *testing.T) {
func testHealthWatchMultipleClients(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -2562,7 +2576,7 @@ func (s) TestHealthWatchSameStatus(t *testing.T) {
func testHealthWatchSameStatus(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
@ -2590,7 +2604,7 @@ func testHealthWatchSetServiceStatusBeforeStartingServer(t *testing.T, e env) {
te := newTest(t, e)
te.healthServer = hs
hs.SetServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
@ -2611,7 +2625,7 @@ func (s) TestHealthWatchDefaultStatusChange(t *testing.T) {
func testHealthWatchDefaultStatusChange(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
stream, cf := newHealthCheckStream(t, te.clientConn(), defaultHealthService)
@ -2632,7 +2646,7 @@ func (s) TestHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T) {
func testHealthWatchSetServiceStatusBeforeClientCallsWatch(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
te.setHealthServingStatus(defaultHealthService, healthpb.HealthCheckResponse_SERVING)
defer te.tearDown()
@ -2652,7 +2666,7 @@ func (s) TestHealthWatchOverallServerHealthChange(t *testing.T) {
func testHealthWatchOverallServerHealthChange(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
stream, cf := newHealthCheckStream(t, te.clientConn(), "")
@ -2684,7 +2698,7 @@ func (s) TestUnknownHandler(t *testing.T) {
func testUnknownHandler(t *testing.T, e env, unknownHandler grpc.StreamHandler) {
te := newTest(t, e)
te.unknownHandler = unknownHandler
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
verifyHealthCheckErrCode(t, 1*time.Second, te.clientConn(), "", codes.Unauthenticated)
}
@ -2700,7 +2714,7 @@ func (s) TestHealthCheckServingStatus(t *testing.T) {
func testHealthCheckServingStatus(t *testing.T, e env) {
te := newTest(t, e)
te.enableHealthServer = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -2721,7 +2735,7 @@ func (s) TestEmptyUnaryWithUserAgent(t *testing.T) {
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -2752,7 +2766,7 @@ func (s) TestFailedEmptyUnary(t *testing.T) {
func testFailedEmptyUnary(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2771,7 +2785,7 @@ func (s) TestLargeUnary(t *testing.T) {
func testLargeUnary(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2810,7 +2824,7 @@ func testExceedMsgLimit(t *testing.T, e env) {
te := newTest(t, e)
maxMsgSize := 1024
te.maxServerMsgSize, te.maxClientMsgSize = newInt(maxMsgSize), newInt(maxMsgSize)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -2889,7 +2903,7 @@ func (s) TestPeerClientSide(t *testing.T) {
func testPeerClientSide(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
peer := new(peer.Peer)
@ -2927,7 +2941,7 @@ func (s) TestPeerNegative(t *testing.T) {
func testPeerNegative(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -2947,7 +2961,7 @@ func (s) TestPeerFailedRPC(t *testing.T) {
func testPeerFailedRPC(t *testing.T, e env) {
te := newTest(t, e)
te.maxServerReceiveMsgSize = newInt(1 * 1024)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3001,7 +3015,7 @@ func (s) TestMetadataUnaryRPC(t *testing.T) {
func testMetadataUnaryRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3046,7 +3060,7 @@ func (s) TestMetadataOrderUnaryRPC(t *testing.T) {
func testMetadataOrderUnaryRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3083,7 +3097,7 @@ func (s) TestMultipleSetTrailerUnaryRPC(t *testing.T) {
func testMultipleSetTrailerUnaryRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
te.startServer(testServer{security: e.security, multipleSetTrailer: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3120,7 +3134,7 @@ func (s) TestMultipleSetTrailerStreamingRPC(t *testing.T) {
func testMultipleSetTrailerStreamingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, multipleSetTrailer: true})
te.startServer(testServer{security: e.security, multipleSetTrailer: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3155,7 +3169,7 @@ func (s) TestSetAndSendHeaderUnaryRPC(t *testing.T) {
// To test header metadata is sent on SendHeader().
func testSetAndSendHeaderUnaryRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setAndSendHeader: true})
te.startServer(testServer{security: e.security, setAndSendHeader: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3198,7 +3212,7 @@ func (s) TestMultipleSetHeaderUnaryRPC(t *testing.T) {
// To test header metadata is sent when sending response.
func testMultipleSetHeaderUnaryRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setHeaderOnly: true})
te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3242,7 +3256,7 @@ func (s) TestMultipleSetHeaderUnaryRPCError(t *testing.T) {
// To test header metadata is sent when sending status.
func testMultipleSetHeaderUnaryRPCError(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setHeaderOnly: true})
te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3285,7 +3299,7 @@ func (s) TestSetAndSendHeaderStreamingRPC(t *testing.T) {
// To test header metadata is sent on SendHeader().
func testSetAndSendHeaderStreamingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setAndSendHeader: true})
te.startServer(testServer{security: e.security, setAndSendHeader: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3325,7 +3339,7 @@ func (s) TestMultipleSetHeaderStreamingRPC(t *testing.T) {
// To test header metadata is sent when sending response.
func testMultipleSetHeaderStreamingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setHeaderOnly: true})
te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3389,7 +3403,7 @@ func (s) TestMultipleSetHeaderStreamingRPCError(t *testing.T) {
// To test header metadata is sent when sending status.
func testMultipleSetHeaderStreamingRPCError(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, setHeaderOnly: true})
te.startServer(testServer{security: e.security, setHeaderOnly: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3454,7 +3468,7 @@ func (s) TestMalformedHTTP2Metadata(t *testing.T) {
func testMalformedHTTP2Metadata(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3497,7 +3511,7 @@ func testTransparentRetry(t *testing.T, e env) {
}
return ctx, nil
}
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -3546,7 +3560,7 @@ func (s) TestCancel(t *testing.T) {
func testCancel(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise("grpc: the client connection is closing; please retry")
te.startServer(&testServer{security: e.security, unaryCallSleepTime: time.Second})
te.startServer(testServer{security: e.security, unaryCallSleepTime: time.Second}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -3583,7 +3597,7 @@ func testCancelNoIO(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise("http2Client.notifyError got notified that the client transport was broken")
te.maxStream = 1 // Only allows 1 live stream per server transport.
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -3592,7 +3606,7 @@ func testCancelNoIO(t *testing.T, e env) {
// Start one blocked RPC for which we'll never send streaming
// input. This will consume the 1 maximum concurrent streams,
// causing future RPCs to hang.
ctx, cancelFirst := context.WithCancel(context.Background())
ctx, cancelFirst := context.WithTimeout(context.Background(), 5*time.Second)
_, err := tc.StreamingInputCall(ctx)
if err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
@ -3605,6 +3619,9 @@ func testCancelNoIO(t *testing.T, e env) {
// succeeding.
// TODO(bradfitz): add internal test hook for this (Issue 534)
for {
if ctx.Err() != nil {
t.Fatal("timed out waiting to get deadline exceeded error")
}
ctx, cancelSecond := context.WithTimeout(context.Background(), 50*time.Millisecond)
_, err := tc.StreamingInputCall(ctx)
cancelSecond()
@ -3672,7 +3689,7 @@ func (s) TestPingPong(t *testing.T) {
func testPingPong(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3731,7 +3748,7 @@ func (s) TestMetadataStreamingRPC(t *testing.T) {
func testMetadataStreamingRPC(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3808,7 +3825,7 @@ func (s) TestServerStreaming(t *testing.T) {
func testServerStreaming(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3863,7 +3880,7 @@ func (s) TestFailedServerStreaming(t *testing.T) {
func testFailedServerStreaming(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = failAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -3892,15 +3909,10 @@ func equalError(x, y error) bool {
return x == y || (x != nil && y != nil && x.Error() == y.Error())
}
// concurrentSendServer is a TestServiceService whose
// StreamingOutputCall makes ten serial Send calls, sending payloads
// concurrentSendStreamingOutputCall makes ten serial Send calls, sending payloads
// "0".."9", inclusive. TestServerStreamingConcurrent verifies they
// were received in the correct order, and that there were no races.
//
// All other TestServiceService methods return unimplemented if called.
type concurrentSendServer struct{}
func (s concurrentSendServer) StreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
func concurrentSendStreamingOutputCall(args *testpb.StreamingOutputCallRequest, stream testpb.TestService_StreamingOutputCallServer) error {
for i := 0; i < 10; i++ {
stream.Send(&testpb.StreamingOutputCallResponse{
Payload: &testpb.Payload{
@ -3920,7 +3932,7 @@ func (s) TestServerStreamingConcurrent(t *testing.T) {
func testServerStreamingConcurrent(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(concurrentSendServer{})
te.startServer(&testpb.TestServiceService{StreamingOutputCall: concurrentSendStreamingOutputCall})
defer te.tearDown()
cc := te.clientConn()
@ -4001,7 +4013,7 @@ func (s) TestClientStreaming(t *testing.T) {
func testClientStreaming(t *testing.T, e env, sizes []int) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4047,7 +4059,7 @@ func (s) TestClientStreamingError(t *testing.T) {
func testClientStreamingError(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, earlyFail: true})
te.startServer(testServer{security: e.security, earlyFail: true}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4092,7 +4104,7 @@ func testExceedMaxStreamsLimit(t *testing.T, e env) {
"grpc: the connection is closing",
)
te.maxStream = 1 // Only allows 1 live stream per server transport.
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -4132,7 +4144,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
"grpc: the connection is closing",
)
te.maxStream = 1 // Allows 1 live stream.
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -4202,7 +4214,7 @@ func testCompressServerHasNoSupport(t *testing.T, e env) {
te.serverCompression = false
te.clientCompression = false
te.clientNopCompression = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4240,7 +4252,7 @@ func testCompressOK(t *testing.T, e env) {
te := newTest(t, e)
te.serverCompression = true
te.clientCompression = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4301,7 +4313,7 @@ func (s) TestIdentityEncoding(t *testing.T) {
func testIdentityEncoding(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4365,7 +4377,7 @@ func testUnaryClientInterceptor(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.unaryClientInt = failOkayRPC
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4391,7 +4403,7 @@ func failOkayStream(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientC
func testStreamClientInterceptor(t *testing.T, e env) {
te := newTest(t, e)
te.streamClientInt = failOkayStream
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4427,7 +4439,7 @@ func errInjector(ctx context.Context, req interface{}, info *grpc.UnaryServerInf
func testUnaryServerInterceptor(t *testing.T, e env) {
te := newTest(t, e)
te.unaryServerInt = errInjector
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4457,7 +4469,7 @@ func fullDuplexOnly(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServ
func testStreamServerInterceptor(t *testing.T, e env) {
te := newTest(t, e)
te.streamServerInt = fullDuplexOnly
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -4494,28 +4506,6 @@ func testStreamServerInterceptor(t *testing.T, e env) {
}
}
// funcServer implements methods of TestServiceService using funcs,
// similar to an http.HandlerFunc.
// Any unimplemented method will return unimplemented. Tests implement the method(s)
// they need.
type funcServer struct {
unaryCall func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error)
streamingInputCall func(stream testpb.TestService_StreamingInputCallServer) error
fullDuplexCall func(stream testpb.TestService_FullDuplexCallServer) error
}
func (s *funcServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return s.unaryCall(ctx, in)
}
func (s *funcServer) StreamingInputCall(stream testpb.TestService_StreamingInputCallServer) error {
return s.streamingInputCall(stream)
}
func (s *funcServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
return s.fullDuplexCall(stream)
}
func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
for _, e := range listTestEnv() {
testClientRequestBodyErrorUnexpectedEOF(t, e)
@ -4524,7 +4514,7 @@ func (s) TestClientRequestBodyErrorUnexpectedEOF(t *testing.T) {
func testClientRequestBodyErrorUnexpectedEOF(t *testing.T, e env) {
te := newTest(t, e)
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
errUnexpectedCall := errors.New("unexpected call func server method")
t.Error(errUnexpectedCall)
return nil, errUnexpectedCall
@ -4548,7 +4538,7 @@ func (s) TestClientRequestBodyErrorCloseAfterLength(t *testing.T) {
func testClientRequestBodyErrorCloseAfterLength(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise("Server.processUnaryRPC failed to write status")
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
errUnexpectedCall := errors.New("unexpected call func server method")
t.Error(errUnexpectedCall)
return nil, errUnexpectedCall
@ -4572,7 +4562,7 @@ func (s) TestClientRequestBodyErrorCancel(t *testing.T) {
func testClientRequestBodyErrorCancel(t *testing.T, e env) {
te := newTest(t, e)
gotCall := make(chan bool, 1)
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
gotCall <- true
return new(testpb.SimpleResponse), nil
}}
@ -4608,7 +4598,7 @@ func (s) TestClientRequestBodyErrorCancelStreamingInput(t *testing.T) {
func testClientRequestBodyErrorCancelStreamingInput(t *testing.T, e env) {
te := newTest(t, e)
recvErr := make(chan error, 1)
ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
_, err := stream.Recv()
recvErr <- err
return nil
@ -4650,7 +4640,7 @@ func testClientInitialHeaderEndStream(t *testing.T, e env) {
// checking.
handlerDone := make(chan struct{})
te := newTest(t, e)
ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
defer close(handlerDone)
// Block on serverTester receiving RST_STREAM. This ensures server has closed
// stream before stream.Recv().
@ -4694,7 +4684,7 @@ func testClientSendDataAfterCloseSend(t *testing.T, e env) {
// checking.
handlerDone := make(chan struct{})
te := newTest(t, e)
ts := &funcServer{streamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
ts := &testpb.TestServiceService{StreamingInputCall: func(stream testpb.TestService_StreamingInputCallServer) error {
defer close(handlerDone)
// Block on serverTester receiving RST_STREAM. This ensures server has closed
// stream before stream.Recv().
@ -4746,7 +4736,7 @@ func (s) TestClientResourceExhaustedCancelFullDuplex(t *testing.T) {
func testClientResourceExhaustedCancelFullDuplex(t *testing.T, e env) {
te := newTest(t, e)
recvErr := make(chan error, 1)
ts := &funcServer{fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
ts := &testpb.TestServiceService{FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
defer close(recvErr)
_, err := stream.Recv()
if err != nil {
@ -5484,7 +5474,7 @@ func testConfigurableWindowSize(t *testing.T, e env, wc windowSizeConfig) {
te.clientInitialWindowSize = wc.clientStream
te.clientInitialConnWindowSize = wc.clientConn
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -5534,7 +5524,7 @@ func (s) TestWaitForReadyConnection(t *testing.T) {
func testWaitForReadyConnection(t *testing.T, e env) {
te := newTest(t, e)
te.userAgent = testAppUA
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn() // Non-blocking dial.
@ -5585,7 +5575,7 @@ func testEncodeDoesntPanic(t *testing.T, e env) {
te := newTest(t, e)
erc := &errCodec{}
te.customCodec = erc
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
te.customCodec = nil
tc := testpb.NewTestServiceClient(te.clientConn())
@ -5619,7 +5609,7 @@ func testSvrWriteStatusEarlyWrite(t *testing.T, e env) {
if err != nil {
t.Fatal(err)
}
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
respParam := []*testpb.ResponseParameters{
@ -5898,7 +5888,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
}
// Case1: sc set maxReqSize to 2048 (send), maxRespSize to 2048 (recv).
te1, ch1 := testServiceConfigSetupTD(t, e)
te1.startServer(&testServer{security: e.security})
te1.startServer(testServer{security: e.security}.Svc())
defer te1.tearDown()
ch1 <- sc
@ -5958,7 +5948,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
te2, ch2 := testServiceConfigSetupTD(t, e)
te2.maxClientReceiveMsgSize = newInt(1024)
te2.maxClientSendMsgSize = newInt(1024)
te2.startServer(&testServer{security: e.security})
te2.startServer(testServer{security: e.security}.Svc())
defer te2.tearDown()
ch2 <- sc
tc = testpb.NewTestServiceClient(te2.clientConn())
@ -6007,7 +5997,7 @@ func testServiceConfigMaxMsgSizeTD(t *testing.T, e env) {
te3, ch3 := testServiceConfigSetupTD(t, e)
te3.maxClientReceiveMsgSize = newInt(4096)
te3.maxClientSendMsgSize = newInt(4096)
te3.startServer(&testServer{security: e.security})
te3.startServer(testServer{security: e.security}.Svc())
defer te3.tearDown()
ch3 <- sc
tc = testpb.NewTestServiceClient(te3.clientConn())
@ -6099,7 +6089,7 @@ func (s) TestMethodFromServerStream(t *testing.T) {
func (s) TestInterceptorCanAccessCallOptions(t *testing.T) {
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
type observedOptions struct {
@ -6210,7 +6200,7 @@ func testCompressorRegister(t *testing.T, e env) {
te.serverCompression = false
te.clientUseCompression = true
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -6385,7 +6375,7 @@ func testClientDoesntDeadlockWhileWritingErrornousLargeMessages(t *testing.T, e
te.userAgent = testAppUA
smallSize := 1024
te.maxServerReceiveMsgSize = &smallSize
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 1048576)
@ -6422,7 +6412,7 @@ func (s) TestRPCTimeout(t *testing.T) {
func testRPCTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.startServer(&testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond})
te.startServer(testServer{security: e.security, unaryCallSleepTime: 500 * time.Millisecond}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -6545,7 +6535,7 @@ func testServerMaxHeaderListSizeClientUserViolation(t *testing.T, e env) {
te := newTest(t, e)
te.maxServerHeaderListSize = new(uint32)
*te.maxServerHeaderListSize = 216
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -6577,7 +6567,7 @@ func testClientMaxHeaderListSizeServerUserViolation(t *testing.T, e env) {
te := newTest(t, e)
te.maxClientHeaderListSize = new(uint32)
*te.maxClientHeaderListSize = 1 // any header server sends will violate
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc := te.clientConn()
@ -6608,7 +6598,7 @@ func testServerMaxHeaderListSizeClientIntentionalViolation(t *testing.T, e env)
te := newTest(t, e)
te.maxServerHeaderListSize = new(uint32)
*te.maxServerHeaderListSize = 512
te.startServer(&testServer{security: e.security})
te.startServer(testServer{security: e.security}.Svc())
defer te.tearDown()
cc, dw := te.clientConnWithConnControl()
@ -6650,7 +6640,7 @@ func testClientMaxHeaderListSizeServerIntentionalViolation(t *testing.T, e env)
te := newTest(t, e)
te.maxClientHeaderListSize = new(uint32)
*te.maxClientHeaderListSize = 200
lw := te.startServerWithConnControl(&testServer{security: e.security, setHeaderOnly: true})
lw := te.startServerWithConnControl(testServer{security: e.security, setHeaderOnly: true}.Svc())
defer te.tearDown()
cc, _ := te.clientConnWithConnControl()
tc := &testServiceClientWrapper{TestServiceClient: testpb.NewTestServiceClient(cc)}
@ -6697,10 +6687,10 @@ func (s) TestNetPipeConn(t *testing.T) {
pl := testutils.NewPipeListener()
s := grpc.NewServer()
defer s.Stop()
ts := &funcServer{unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
}}
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s, ts)
go s.Serve(pl)
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
@ -6725,7 +6715,18 @@ func testLargeTimeout(t *testing.T, e env) {
te := newTest(t, e)
te.declareLogNoise("Server.processUnaryRPC failed to write status")
ts := &funcServer{}
maxTimeoutChan := make(chan time.Duration, 1)
ts := &testpb.TestServiceService{UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
maxTimeout := <-maxTimeoutChan
deadline, ok := ctx.Deadline()
timeout := time.Until(deadline)
minTimeout := maxTimeout - 5*time.Second
if !ok || timeout < minTimeout || timeout > maxTimeout {
t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
return nil, status.Error(codes.OutOfRange, "deadline error")
}
return &testpb.SimpleResponse{}, nil
}}
te.startServer(ts)
defer te.tearDown()
tc := testpb.NewTestServiceClient(te.clientConn())
@ -6737,17 +6738,7 @@ func testLargeTimeout(t *testing.T, e env) {
}
for i, maxTimeout := range timeouts {
ts.unaryCall = func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
deadline, ok := ctx.Deadline()
timeout := time.Until(deadline)
minTimeout := maxTimeout - 5*time.Second
if !ok || timeout < minTimeout || timeout > maxTimeout {
t.Errorf("ctx.Deadline() = (now+%v), %v; want [%v, %v], true", timeout, ok, minTimeout, maxTimeout)
return nil, status.Error(codes.OutOfRange, "deadline error")
}
return &testpb.SimpleResponse{}, nil
}
maxTimeoutChan <- maxTimeout
ctx, cancel := context.WithTimeout(context.Background(), maxTimeout)
defer cancel()
@ -6770,11 +6761,11 @@ func (s) TestGoAwayThenClose(t *testing.T) {
}
s1 := grpc.NewServer()
defer s1.Stop()
ts := &funcServer{
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
ts := &testpb.TestServiceService{
UnaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil
},
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
FullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
// Wait forever.
_, err := stream.Recv()
if err == nil {
@ -6783,7 +6774,7 @@ func (s) TestGoAwayThenClose(t *testing.T) {
return err
},
}
testpb.RegisterTestServiceService(s1, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s1, ts)
go s1.Serve(lis1)
conn2Established := grpcsync.NewEvent()
@ -6793,7 +6784,7 @@ func (s) TestGoAwayThenClose(t *testing.T) {
}
s2 := grpc.NewServer()
defer s2.Stop()
testpb.RegisterTestServiceService(s2, testpb.NewTestServiceService(ts))
testpb.RegisterTestServiceService(s2, ts)
go s2.Serve(lis2)
r := manual.NewBuilderWithScheme("whatever")
@ -6863,7 +6854,7 @@ func (lis notifyingListener) Accept() (net.Conn, error) {
func (s) TestRPCWaitsForResolver(t *testing.T) {
te := testServiceConfigSetup(t, tcpClearRREnv)
te.startServer(&testServer{security: tcpClearRREnv.security})
te.startServer(testServer{security: tcpClearRREnv.security}.Svc())
defer te.tearDown()
r := manual.NewBuilderWithScheme("whatever")

View File

@ -140,7 +140,7 @@ func setupServer(sc *svrConfig) (s *grpc.Server, lis net.Listener, ts *testHealt
ts = newTestHealthServer()
}
healthgrpc.RegisterHealthServer(s, ts)
testpb.RegisterTestServiceService(s, testpb.NewTestServiceService(&testServer{}))
testpb.RegisterTestServiceService(s, testServer{}.Svc())
go s.Serve(lis)
return s, lis, ts, s.Stop, nil
}