Tune transport Monitor, modify the test and add more comments

This commit is contained in:
iamqizhao 2016-07-28 11:07:42 -07:00
parent 1faf2ca61b
commit cd4ca4d808
2 changed files with 15 additions and 40 deletions

View File

@ -68,7 +68,7 @@ var (
// errCredentialsConflict indicates that grpc.WithTransportCredentials()
// and grpc.WithInsecure() are both called for a connection.
errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)")
// errNetworkIP indicates that the connection is down due to some network I/O error.
// errNetworkIO indicates that the connection is down due to some network I/O error.
errNetworkIO = errors.New("grpc: failed with network I/O error")
// errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs.
errConnDrain = errors.New("grpc: the connection is drained")
@ -631,13 +631,18 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
// If GoAway and some network I/O error happen concurrently, ac and its underlying transport
// are closed.
// In both cases, a new ac is created.
select {
case <-t.Error():
t.Close()
return
ac.tearDown(errNetworkIO)
default:
ac.tearDown(errConnDrain)
}
ac.tearDown(errConnDrain)
ac.cc.newAddrConn(ac.addr, true)
return
case <-t.Error():
@ -646,7 +651,8 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
t.Close()
ac.tearDown(errNetworkIO)
ac.cc.newAddrConn(ac.addr, true)
return
default:
}

View File

@ -710,50 +710,19 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) {
cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc)
stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false))
if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
}
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, <nil>", tc, err)
}
ch := make(chan struct{})
// Close ClientConn and Server concurrently.
go func() {
te.srv.GracefulStop()
close(ch)
}()
// Loop until the server side GoAway signal is propagated to the client.
for {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
continue
}
break
}
// Stop the server and close all the connections.
te.srv.Stop()
respParam := []*testpb.ResponseParameters{
{
Size: proto.Int32(1),
},
}
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
if err != nil {
t.Fatal(err)
}
req := &testpb.StreamingOutputCallRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseParameters: respParam,
Payload: payload,
}
if err := stream.Send(req); err == nil {
if _, err := stream.Recv(); err == nil {
t.Fatalf("%v.Recv() = _, %v, want _, <nil>", stream, err)
}
}
go func() {
cc.Close()
}()
<-ch
awaitNewConnLogOutput()
}
func TestConcurrentServerStopAndGoAway(t *testing.T) {