From 0c58a17d6fc3b1cfbc9450056ecc5e9d229b8736 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 22 Aug 2016 16:44:28 -0700 Subject: [PATCH 1/6] Add credentials ErrConnDispatch --- credentials/credentials.go | 7 ++++++ server.go | 5 +++- test/end2end_test.go | 48 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 59 insertions(+), 1 deletion(-) diff --git a/credentials/credentials.go b/credentials/credentials.go index 3f17b7062..13be45742 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -40,6 +40,7 @@ package credentials // import "google.golang.org/grpc/credentials" import ( "crypto/tls" "crypto/x509" + "errors" "fmt" "io/ioutil" "net" @@ -86,6 +87,12 @@ type AuthInfo interface { AuthType() string } +var ( + // ErrConnDispatched indicates that rawConn has been dispatched out of gRPC + // and the caller should not close rawConn. + ErrConnDispatched = errors.New("credentials: rawConn is dispatched out of gRPC") +) + // TransportCredentials defines the common interface for all the live gRPC wire // protocols and supported transport security protocols (e.g., TLS, SSL). type TransportCredentials interface { diff --git a/server.go b/server.go index 1ed8aac9e..b2a825ad0 100644 --- a/server.go +++ b/server.go @@ -367,7 +367,10 @@ func (s *Server) handleRawConn(rawConn net.Conn) { s.errorf("ServerHandshake(%q) failed: %v", rawConn.RemoteAddr(), err) s.mu.Unlock() grpclog.Printf("grpc: Server.Serve failed to complete security handshake from %q: %v", rawConn.RemoteAddr(), err) - rawConn.Close() + // If serverHandShake returns ErrConnDispatched, keep rawConn open. + if err != credentials.ErrConnDispatched { + rawConn.Close() + } return } diff --git a/test/end2end_test.go b/test/end2end_test.go index c76c58b1c..09d389714 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2349,6 +2349,54 @@ func TestNonFailFastRPCWithNoBalancerErrorOnBadCertificates(t *testing.T) { } } +type serverDispatchCred struct { + ready chan struct{} + rawConn net.Conn +} + +func newServerDispatchCred() *serverDispatchCred { + return &serverDispatchCred{ + ready: make(chan struct{}), + } +} +func (c *serverDispatchCred) ClientHandshake(ctx context.Context, addr string, rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { + return rawConn, nil, nil +} +func (c *serverDispatchCred) ServerHandshake(rawConn net.Conn) (net.Conn, credentials.AuthInfo, error) { + c.rawConn = rawConn + close(c.ready) + return nil, nil, credentials.ErrConnDispatched +} +func (c *serverDispatchCred) Info() credentials.ProtocolInfo { + return credentials.ProtocolInfo{} +} +func (c *serverDispatchCred) getRawConn() net.Conn { + <-c.ready + return c.rawConn +} + +func TestServerCredsDispatch(t *testing.T) { + lis, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + cred := newServerDispatchCred() + s := grpc.NewServer(grpc.Creds(cred)) + go s.Serve(lis) + defer s.Stop() + + cc, err := grpc.Dial(lis.Addr().String(), grpc.WithTransportCredentials(cred)) + if err != nil { + t.Fatalf("grpc.Dial(%q) = %v", lis.Addr().String(), err) + } + defer cc.Close() + + // Check rawConn is not closed. + if n, err := cred.getRawConn().Write([]byte{0}); n <= 0 || err != nil { + t.Errorf("Read() = %v, %v; want n>0, ", n, err) + } +} + // interestingGoroutines returns all goroutines we care about for the purpose // of leak checking. It excludes testing or runtime ones. func interestingGoroutines() (gs []string) { From 152c95e0d83723d61cc1355d086146f435a6910f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 24 Aug 2016 16:34:37 -0700 Subject: [PATCH 2/6] add err logging to testFailFast --- test/end2end_test.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index c76c58b1c..e62297e20 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -836,9 +836,11 @@ func testFailFast(t *testing.T, e env) { te.srv.Stop() // Loop until the server teardown is propagated to the client. for { - if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); grpc.Code(err) == codes.Unavailable { + _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) + if grpc.Code(err) == codes.Unavailable { break } + grpclog.Println("%v.EmptyCall(_, _) = _, %v", err) time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. From dd5645bebff44f6b88780bb949022a09eadd7dae Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 25 Aug 2016 15:18:19 -0400 Subject: [PATCH 3/6] Avoid goroutine leak in clientconn Prior to this change, it was possible for `DialContext` to return `(nil, err)` without properly closing the `ClientConn`, resulting in an unavoidable leak of the `resetAddrConn` goroutine. --- clientconn.go | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/clientconn.go b/clientconn.go index ccaff94a2..1d3b46c60 100644 --- a/clientconn.go +++ b/clientconn.go @@ -234,13 +234,13 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * defer func() { select { case <-ctx.Done(): - if conn != nil { - conn.Close() - } - conn = nil - err = ctx.Err() + conn, err = nil, ctx.Err() default: } + + if err != nil { + cc.Close() + } }() for _, opt := range opts { @@ -296,11 +296,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * return nil, ctx.Err() case err := <-waitC: if err != nil { - cc.Close() return nil, err } case <-timeoutCh: - cc.Close() return nil, ErrClientConnTimeout } // If balancer is nil or balancer.Notify() is nil, ok will be false here. From 4c15c984f2315613aefcd75dc23e8da4d4444d25 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 25 Aug 2016 15:04:02 -0700 Subject: [PATCH 4/6] change to Printf --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index e62297e20..0f89af5dc 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -840,7 +840,7 @@ func testFailFast(t *testing.T, e env) { if grpc.Code(err) == codes.Unavailable { break } - grpclog.Println("%v.EmptyCall(_, _) = _, %v", err) + grpclog.Printf("%v.EmptyCall(_, _) = _, %v", err) time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. From 42e031a928289bf1b6c41559727b514d295752eb Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 25 Aug 2016 15:13:27 -0700 Subject: [PATCH 5/6] Use fmt instead grpclog --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 0f89af5dc..b4220271c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -840,7 +840,7 @@ func testFailFast(t *testing.T, e env) { if grpc.Code(err) == codes.Unavailable { break } - grpclog.Printf("%v.EmptyCall(_, _) = _, %v", err) + fmt.Printf("%v.EmptyCall(_, _) = _, %v", err) time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable. From 935fb215d67f361cebc97e641add3741464544da Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 25 Aug 2016 16:30:04 -0700 Subject: [PATCH 6/6] fix --- test/end2end_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index b4220271c..b26442f2e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -840,7 +840,7 @@ func testFailFast(t *testing.T, e env) { if grpc.Code(err) == codes.Unavailable { break } - fmt.Printf("%v.EmptyCall(_, _) = _, %v", err) + fmt.Printf("%v.EmptyCall(_, _) = _, %v", tc, err) time.Sleep(10 * time.Millisecond) } // The client keeps reconnecting and ongoing fail-fast RPCs should fail with code.Unavailable.