mirror of https://github.com/grpc/grpc-go.git
test: move e2e goaway tests to goaway_test.go (#5820)
This commit is contained in:
parent
0fe49e823f
commit
9f97673ba4
|
@ -64,7 +64,6 @@ import (
|
|||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/internal/transport"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/resolver"
|
||||
|
@ -997,338 +996,6 @@ func testServerGracefulStopIdempotent(t *testing.T, e env) {
|
|||
}
|
||||
}
|
||||
|
||||
func (s) TestServerGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
// A new RPC should fail.
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
|
||||
}
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestServerGoAwayPendingRPC(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerGoAwayPendingRPC(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerGoAwayPendingRPC(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
start := time.Now()
|
||||
errored := false
|
||||
for time.Since(start) < time.Second {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
|
||||
cancel()
|
||||
if err != nil {
|
||||
errored = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !errored {
|
||||
t.Fatalf("GoAway never received by client")
|
||||
}
|
||||
respParam := []*testpb.ResponseParameters{{Size: 1}}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
// The existing RPC should be still good to proceed.
|
||||
if err := stream.Send(req); err != nil {
|
||||
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||
}
|
||||
// The RPC will run until canceled.
|
||||
cancel()
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerMultipleGoAwayPendingRPC(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch1 := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch1)
|
||||
}()
|
||||
ch2 := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch2)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
select {
|
||||
case <-ch1:
|
||||
t.Fatal("GracefulStop() terminated early")
|
||||
case <-ch2:
|
||||
t.Fatal("GracefulStop() terminated early")
|
||||
default:
|
||||
}
|
||||
respParam := []*testpb.ResponseParameters{
|
||||
{
|
||||
Size: 1,
|
||||
},
|
||||
}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
// The existing RPC should be still good to proceed.
|
||||
if err := stream.Send(req); err != nil {
|
||||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||
}
|
||||
if err := stream.CloseSend(); err != nil {
|
||||
t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
|
||||
}
|
||||
<-ch1
|
||||
<-ch2
|
||||
cancel()
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testConcurrentClientConnCloseAndServerGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
// Close ClientConn and Server concurrently.
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
go func() {
|
||||
cc.Close()
|
||||
}()
|
||||
<-ch
|
||||
}
|
||||
|
||||
func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testConcurrentServerStopAndGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
// Stop the server and close all the connections.
|
||||
te.srv.Stop()
|
||||
respParam := []*testpb.ResponseParameters{
|
||||
{
|
||||
Size: 1,
|
||||
},
|
||||
}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
sendStart := time.Now()
|
||||
for {
|
||||
if err := stream.Send(req); err == io.EOF {
|
||||
// stream.Send should eventually send io.EOF
|
||||
break
|
||||
} else if err != nil {
|
||||
// Send should never return a transport-level error.
|
||||
t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
|
||||
}
|
||||
if time.Since(sendStart) > 2*time.Second {
|
||||
t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if _, err := stream.Recv(); err == nil || err == io.EOF {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
|
||||
}
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
|
||||
rpcStartedOnServer := make(chan struct{})
|
||||
rpcDoneOnClient := make(chan struct{})
|
||||
|
@ -1376,120 +1043,6 @@ func (s) TestDetailedConnectionCloseErrorPropagatesToRpcError(t *testing.T) {
|
|||
close(rpcDoneOnClient)
|
||||
}
|
||||
|
||||
func (s) TestDetailedGoawayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
|
||||
rpcDoneOnClient := make(chan struct{})
|
||||
ss := &stubserver.StubServer{
|
||||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
<-rpcDoneOnClient
|
||||
return status.Error(codes.Internal, "arbitrary status")
|
||||
},
|
||||
}
|
||||
sopts := []grpc.ServerOption{
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionAge: time.Millisecond * 100,
|
||||
MaxConnectionAgeGrace: time.Millisecond,
|
||||
}),
|
||||
}
|
||||
if err := ss.Start(sopts); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := ss.Client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
|
||||
}
|
||||
const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
|
||||
_, err = stream.Recv()
|
||||
close(rpcDoneOnClient)
|
||||
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestDetailedGoawayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
|
||||
// set the min keepalive time very low so that this test can take
|
||||
// a reasonable amount of time
|
||||
prev := internal.KeepaliveMinPingTime
|
||||
internal.KeepaliveMinPingTime = time.Millisecond
|
||||
defer func() { internal.KeepaliveMinPingTime = prev }()
|
||||
|
||||
rpcDoneOnClient := make(chan struct{})
|
||||
ss := &stubserver.StubServer{
|
||||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
<-rpcDoneOnClient
|
||||
return status.Error(codes.Internal, "arbitrary status")
|
||||
},
|
||||
}
|
||||
sopts := []grpc.ServerOption{
|
||||
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: time.Second * 1000, /* arbitrary, large value */
|
||||
}),
|
||||
}
|
||||
dopts := []grpc.DialOption{
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: time.Millisecond, /* should trigger "too many pings" error quickly */
|
||||
Timeout: time.Second * 1000, /* arbitrary, large value */
|
||||
PermitWithoutStream: false,
|
||||
}),
|
||||
}
|
||||
if err := ss.Start(sopts, dopts...); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := ss.Client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
|
||||
}
|
||||
const expectedErrorMessageSubstring = `received prior goaway: code: ENHANCE_YOUR_CALM, debug data: "too_many_pings"`
|
||||
_, err = stream.Recv()
|
||||
close(rpcDoneOnClient)
|
||||
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testClientConnCloseAfterGoAwayWithActiveStream(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if _, err := tc.FullDuplexCall(ctx); err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(done)
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cc.Close()
|
||||
timeout := time.NewTimer(time.Second)
|
||||
select {
|
||||
case <-done:
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Test timed-out.")
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestFailFast(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
testFailFast(t, e)
|
||||
|
@ -7114,115 +6667,6 @@ func testLargeTimeout(t *testing.T, e env) {
|
|||
}
|
||||
}
|
||||
|
||||
// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
|
||||
// test ensures that the connection is re-created after GO_AWAY and not affected by the
|
||||
// subsequent (old) connection closure.
|
||||
func (s) TestGoAwayThenClose(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
lis1, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
s1 := grpc.NewServer()
|
||||
defer s1.Stop()
|
||||
ts := &funcServer{
|
||||
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
},
|
||||
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
|
||||
t.Errorf("unexpected error from send: %v", err)
|
||||
return err
|
||||
}
|
||||
// Wait forever.
|
||||
_, err := stream.Recv()
|
||||
if err == nil {
|
||||
t.Error("expected to never receive any message")
|
||||
}
|
||||
return err
|
||||
},
|
||||
}
|
||||
testpb.RegisterTestServiceServer(s1, ts)
|
||||
go s1.Serve(lis1)
|
||||
|
||||
conn2Established := grpcsync.NewEvent()
|
||||
lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
s2 := grpc.NewServer()
|
||||
defer s2.Stop()
|
||||
testpb.RegisterTestServiceServer(s2, ts)
|
||||
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
r.InitialState(resolver.State{Addresses: []resolver.Address{
|
||||
{Addr: lis1.Addr().String()},
|
||||
{Addr: lis2.Addr().String()},
|
||||
}})
|
||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
|
||||
// We make a streaming RPC and do an one-message-round-trip to make sure
|
||||
// it's created on connection 1.
|
||||
//
|
||||
// We use a long-lived RPC because it will cause GracefulStop to send
|
||||
// GO_AWAY, but the connection doesn't get closed until the server stops and
|
||||
// the client receives the error.
|
||||
stream, err := client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
if _, err = stream.Recv(); err != nil {
|
||||
t.Fatalf("unexpected error from first recv: %v", err)
|
||||
}
|
||||
|
||||
go s2.Serve(lis2)
|
||||
|
||||
// Send GO_AWAY to connection 1.
|
||||
go s1.GracefulStop()
|
||||
|
||||
// Wait for the ClientConn to enter IDLE state.
|
||||
state := cc.GetState()
|
||||
for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
|
||||
}
|
||||
if state != connectivity.Idle {
|
||||
t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state)
|
||||
}
|
||||
|
||||
// Initiate another RPC to create another connection.
|
||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
|
||||
// Assert that connection 2 has been established.
|
||||
<-conn2Established.Done()
|
||||
|
||||
// Close the listener for server2 to prevent it from allowing new connections.
|
||||
lis2.Close()
|
||||
|
||||
// Close connection 1.
|
||||
s1.Stop()
|
||||
|
||||
// Wait for client to close.
|
||||
if _, err = stream.Recv(); err == nil {
|
||||
t.Fatal("expected the stream to die, but got a successful Recv")
|
||||
}
|
||||
|
||||
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
|
||||
for i := 0; i < 10; i++ {
|
||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func listenWithNotifyingListener(network, address string, event *grpcsync.Event) (net.Listener, error) {
|
||||
lis, err := net.Listen(network, address)
|
||||
if err != nil {
|
||||
|
@ -8122,61 +7566,6 @@ func (s) TestRecvWhileReturningStatus(t *testing.T) {
|
|||
}
|
||||
}
|
||||
|
||||
// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
|
||||
// sends a goaway with a stream id that is smaller than some created streams on
|
||||
// the client, while the client is simultaneously creating new streams. This
|
||||
// should not induce a deadlock.
|
||||
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("error listening: %v", err)
|
||||
}
|
||||
|
||||
ctCh := testutils.NewChannel()
|
||||
go func() {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Errorf("error in lis.Accept(): %v", err)
|
||||
}
|
||||
ct := newClientTester(t, conn)
|
||||
ctCh.Send(ct)
|
||||
}()
|
||||
|
||||
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("error dialing: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
val, err := ctCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
|
||||
}
|
||||
ct := val.(*clientTester)
|
||||
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
someStreamsCreated := grpcsync.NewEvent()
|
||||
goAwayWritten := grpcsync.NewEvent()
|
||||
go func() {
|
||||
for i := 0; i < 20; i++ {
|
||||
if i == 10 {
|
||||
<-goAwayWritten.Done()
|
||||
}
|
||||
tc.FullDuplexCall(ctx)
|
||||
if i == 4 {
|
||||
someStreamsCreated.Fire()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-someStreamsCreated.Done()
|
||||
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
|
||||
goAwayWritten.Fire()
|
||||
}
|
||||
|
||||
type mockBinaryLogger struct {
|
||||
mml *mockMethodLogger
|
||||
}
|
||||
|
|
|
@ -20,14 +20,25 @@ package test
|
|||
|
||||
import (
|
||||
"context"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/connectivity"
|
||||
"google.golang.org/grpc/credentials/insecure"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/internal/grpcsync"
|
||||
"google.golang.org/grpc/internal/stubserver"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/resolver/manual"
|
||||
"google.golang.org/grpc/status"
|
||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||
)
|
||||
|
||||
|
@ -74,3 +85,613 @@ func (s) TestGracefulClientOnGoAway(t *testing.T) {
|
|||
cancel()
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestDetailedGoAwayErrorOnGracefulClosePropagatesToRPCError(t *testing.T) {
|
||||
rpcDoneOnClient := make(chan struct{})
|
||||
ss := &stubserver.StubServer{
|
||||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
<-rpcDoneOnClient
|
||||
return status.Error(codes.Internal, "arbitrary status")
|
||||
},
|
||||
}
|
||||
sopts := []grpc.ServerOption{
|
||||
grpc.KeepaliveParams(keepalive.ServerParameters{
|
||||
MaxConnectionAge: time.Millisecond * 100,
|
||||
MaxConnectionAgeGrace: time.Millisecond,
|
||||
}),
|
||||
}
|
||||
if err := ss.Start(sopts); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := ss.Client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
|
||||
}
|
||||
const expectedErrorMessageSubstring = "received prior goaway: code: NO_ERROR"
|
||||
_, err = stream.Recv()
|
||||
close(rpcDoneOnClient)
|
||||
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: %q", stream, err, expectedErrorMessageSubstring)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestDetailedGoAwayErrorOnAbruptClosePropagatesToRPCError(t *testing.T) {
|
||||
// set the min keepalive time very low so that this test can take
|
||||
// a reasonable amount of time
|
||||
prev := internal.KeepaliveMinPingTime
|
||||
internal.KeepaliveMinPingTime = time.Millisecond
|
||||
defer func() { internal.KeepaliveMinPingTime = prev }()
|
||||
|
||||
rpcDoneOnClient := make(chan struct{})
|
||||
ss := &stubserver.StubServer{
|
||||
FullDuplexCallF: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
<-rpcDoneOnClient
|
||||
return status.Error(codes.Internal, "arbitrary status")
|
||||
},
|
||||
}
|
||||
sopts := []grpc.ServerOption{
|
||||
grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
|
||||
MinTime: time.Second * 1000, /* arbitrary, large value */
|
||||
}),
|
||||
}
|
||||
dopts := []grpc.DialOption{
|
||||
grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
||||
Time: time.Millisecond, /* should trigger "too many pings" error quickly */
|
||||
Timeout: time.Second * 1000, /* arbitrary, large value */
|
||||
PermitWithoutStream: false,
|
||||
}),
|
||||
}
|
||||
if err := ss.Start(sopts, dopts...); err != nil {
|
||||
t.Fatalf("Error starting endpoint server: %v", err)
|
||||
}
|
||||
defer ss.Stop()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := ss.Client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall = _, %v, want _, <nil>", ss.Client, err)
|
||||
}
|
||||
const expectedErrorMessageSubstring = `received prior goaway: code: ENHANCE_YOUR_CALM, debug data: "too_many_pings"`
|
||||
_, err = stream.Recv()
|
||||
close(rpcDoneOnClient)
|
||||
if err == nil || !strings.Contains(err.Error(), expectedErrorMessageSubstring) {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, rpc error containing substring: |%v|", stream, err, expectedErrorMessageSubstring)
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestClientConnCloseAfterGoAwayWithActiveStream(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testClientConnCloseAfterGoAwayWithActiveStream(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testClientConnCloseAfterGoAwayWithActiveStream(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
if _, err := tc.FullDuplexCall(ctx); err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(done)
|
||||
}()
|
||||
time.Sleep(50 * time.Millisecond)
|
||||
cc.Close()
|
||||
timeout := time.NewTimer(time.Second)
|
||||
select {
|
||||
case <-done:
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Test timed-out.")
|
||||
}
|
||||
}
|
||||
|
||||
func (s) TestServerGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); err != nil && status.Code(err) != codes.DeadlineExceeded {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
// A new RPC should fail.
|
||||
ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}); status.Code(err) != codes.Unavailable && status.Code(err) != codes.Internal {
|
||||
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, %s or %s", err, codes.Unavailable, codes.Internal)
|
||||
}
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestServerGoAwayPendingRPC(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerGoAwayPendingRPC(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerGoAwayPendingRPC(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
start := time.Now()
|
||||
errored := false
|
||||
for time.Since(start) < time.Second {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
_, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true))
|
||||
cancel()
|
||||
if err != nil {
|
||||
errored = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if !errored {
|
||||
t.Fatalf("GoAway never received by client")
|
||||
}
|
||||
respParam := []*testpb.ResponseParameters{{Size: 1}}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
// The existing RPC should be still good to proceed.
|
||||
if err := stream.Send(req); err != nil {
|
||||
t.Fatalf("%v.Send(_) = %v, want <nil>", stream, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||
}
|
||||
// The RPC will run until canceled.
|
||||
cancel()
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestServerMultipleGoAwayPendingRPC(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerMultipleGoAwayPendingRPC(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerMultipleGoAwayPendingRPC(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch1 := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch1)
|
||||
}()
|
||||
ch2 := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch2)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
select {
|
||||
case <-ch1:
|
||||
t.Fatal("GracefulStop() terminated early")
|
||||
case <-ch2:
|
||||
t.Fatal("GracefulStop() terminated early")
|
||||
default:
|
||||
}
|
||||
respParam := []*testpb.ResponseParameters{
|
||||
{
|
||||
Size: 1,
|
||||
},
|
||||
}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
// The existing RPC should be still good to proceed.
|
||||
if err := stream.Send(req); err != nil {
|
||||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
|
||||
}
|
||||
if err := stream.CloseSend(); err != nil {
|
||||
t.Fatalf("%v.CloseSend() = %v, want <nil>", stream, err)
|
||||
}
|
||||
<-ch1
|
||||
<-ch2
|
||||
cancel()
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func (s) TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testConcurrentClientConnCloseAndServerGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
// Close ClientConn and Server concurrently.
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
go func() {
|
||||
cc.Close()
|
||||
}()
|
||||
<-ch
|
||||
}
|
||||
|
||||
func (s) TestConcurrentServerStopAndGoAway(t *testing.T) {
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testConcurrentServerStopAndGoAway(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testConcurrentServerStopAndGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: addrConn.transportMonitor exits due to: grpc: the connection is closing",
|
||||
"grpc: addrConn.resetTransport failed to create client transport: connection error",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.WaitForReady(true))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
|
||||
// Finish an RPC to make sure the connection is good.
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
|
||||
}
|
||||
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
// Loop until the server side GoAway signal is propagated to the client.
|
||||
for {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
|
||||
cancel()
|
||||
break
|
||||
}
|
||||
cancel()
|
||||
}
|
||||
// Stop the server and close all the connections.
|
||||
te.srv.Stop()
|
||||
respParam := []*testpb.ResponseParameters{
|
||||
{
|
||||
Size: 1,
|
||||
},
|
||||
}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE,
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
sendStart := time.Now()
|
||||
for {
|
||||
if err := stream.Send(req); err == io.EOF {
|
||||
// stream.Send should eventually send io.EOF
|
||||
break
|
||||
} else if err != nil {
|
||||
// Send should never return a transport-level error.
|
||||
t.Fatalf("stream.Send(%v) = %v; want <nil or io.EOF>", req, err)
|
||||
}
|
||||
if time.Since(sendStart) > 2*time.Second {
|
||||
t.Fatalf("stream.Send(_) did not return io.EOF after 2s")
|
||||
}
|
||||
time.Sleep(time.Millisecond)
|
||||
}
|
||||
if _, err := stream.Recv(); err == nil || err == io.EOF {
|
||||
t.Fatalf("%v.Recv() = _, %v, want _, <non-nil, non-EOF>", stream, err)
|
||||
}
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
// Proxies typically send GO_AWAY followed by connection closure a minute or so later. This
|
||||
// test ensures that the connection is re-created after GO_AWAY and not affected by the
|
||||
// subsequent (old) connection closure.
|
||||
func (s) TestGoAwayThenClose(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
|
||||
defer cancel()
|
||||
|
||||
lis1, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
s1 := grpc.NewServer()
|
||||
defer s1.Stop()
|
||||
ts := &funcServer{
|
||||
unaryCall: func(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
|
||||
return &testpb.SimpleResponse{}, nil
|
||||
},
|
||||
fullDuplexCall: func(stream testpb.TestService_FullDuplexCallServer) error {
|
||||
if err := stream.Send(&testpb.StreamingOutputCallResponse{}); err != nil {
|
||||
t.Errorf("unexpected error from send: %v", err)
|
||||
return err
|
||||
}
|
||||
// Wait forever.
|
||||
_, err := stream.Recv()
|
||||
if err == nil {
|
||||
t.Error("expected to never receive any message")
|
||||
}
|
||||
return err
|
||||
},
|
||||
}
|
||||
testpb.RegisterTestServiceServer(s1, ts)
|
||||
go s1.Serve(lis1)
|
||||
|
||||
conn2Established := grpcsync.NewEvent()
|
||||
lis2, err := listenWithNotifyingListener("tcp", "localhost:0", conn2Established)
|
||||
if err != nil {
|
||||
t.Fatalf("Error while listening. Err: %v", err)
|
||||
}
|
||||
s2 := grpc.NewServer()
|
||||
defer s2.Stop()
|
||||
testpb.RegisterTestServiceServer(s2, ts)
|
||||
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
r.InitialState(resolver.State{Addresses: []resolver.Address{
|
||||
{Addr: lis1.Addr().String()},
|
||||
{Addr: lis2.Addr().String()},
|
||||
}})
|
||||
cc, err := grpc.DialContext(ctx, r.Scheme()+":///", grpc.WithResolvers(r), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("Error creating client: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
client := testpb.NewTestServiceClient(cc)
|
||||
|
||||
// We make a streaming RPC and do an one-message-round-trip to make sure
|
||||
// it's created on connection 1.
|
||||
//
|
||||
// We use a long-lived RPC because it will cause GracefulStop to send
|
||||
// GO_AWAY, but the connection doesn't get closed until the server stops and
|
||||
// the client receives the error.
|
||||
stream, err := client.FullDuplexCall(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("FullDuplexCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
if _, err = stream.Recv(); err != nil {
|
||||
t.Fatalf("unexpected error from first recv: %v", err)
|
||||
}
|
||||
|
||||
go s2.Serve(lis2)
|
||||
|
||||
// Send GO_AWAY to connection 1.
|
||||
go s1.GracefulStop()
|
||||
|
||||
// Wait for the ClientConn to enter IDLE state.
|
||||
state := cc.GetState()
|
||||
for ; state != connectivity.Idle && cc.WaitForStateChange(ctx, state); state = cc.GetState() {
|
||||
}
|
||||
if state != connectivity.Idle {
|
||||
t.Fatalf("timed out waiting for IDLE channel state; last state = %v", state)
|
||||
}
|
||||
|
||||
// Initiate another RPC to create another connection.
|
||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
|
||||
// Assert that connection 2 has been established.
|
||||
<-conn2Established.Done()
|
||||
|
||||
// Close the listener for server2 to prevent it from allowing new connections.
|
||||
lis2.Close()
|
||||
|
||||
// Close connection 1.
|
||||
s1.Stop()
|
||||
|
||||
// Wait for client to close.
|
||||
if _, err = stream.Recv(); err == nil {
|
||||
t.Fatal("expected the stream to die, but got a successful Recv")
|
||||
}
|
||||
|
||||
// Do a bunch of RPCs, make sure it stays stable. These should go to connection 2.
|
||||
for i := 0; i < 10; i++ {
|
||||
if _, err := client.UnaryCall(ctx, &testpb.SimpleRequest{}); err != nil {
|
||||
t.Fatalf("UnaryCall(_) = _, %v; want _, nil", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TestGoAwayStreamIDSmallerThanCreatedStreams tests the scenario where a server
|
||||
// sends a goaway with a stream id that is smaller than some created streams on
|
||||
// the client, while the client is simultaneously creating new streams. This
|
||||
// should not induce a deadlock.
|
||||
func (s) TestGoAwayStreamIDSmallerThanCreatedStreams(t *testing.T) {
|
||||
lis, err := net.Listen("tcp", "localhost:0")
|
||||
if err != nil {
|
||||
t.Fatalf("error listening: %v", err)
|
||||
}
|
||||
|
||||
ctCh := testutils.NewChannel()
|
||||
go func() {
|
||||
conn, err := lis.Accept()
|
||||
if err != nil {
|
||||
t.Errorf("error in lis.Accept(): %v", err)
|
||||
}
|
||||
ct := newClientTester(t, conn)
|
||||
ctCh.Send(ct)
|
||||
}()
|
||||
|
||||
cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(insecure.NewCredentials()))
|
||||
if err != nil {
|
||||
t.Fatalf("error dialing: %v", err)
|
||||
}
|
||||
defer cc.Close()
|
||||
|
||||
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
|
||||
defer cancel()
|
||||
|
||||
val, err := ctCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("timeout waiting for client transport (should be given after http2 creation)")
|
||||
}
|
||||
ct := val.(*clientTester)
|
||||
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
someStreamsCreated := grpcsync.NewEvent()
|
||||
goAwayWritten := grpcsync.NewEvent()
|
||||
go func() {
|
||||
for i := 0; i < 20; i++ {
|
||||
if i == 10 {
|
||||
<-goAwayWritten.Done()
|
||||
}
|
||||
tc.FullDuplexCall(ctx)
|
||||
if i == 4 {
|
||||
someStreamsCreated.Fire()
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
<-someStreamsCreated.Done()
|
||||
ct.writeGoAway(1, http2.ErrCodeNo, []byte{})
|
||||
goAwayWritten.Fire()
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue