From 351e2d02977f20df4393a6e5747420361ad59770 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Tue, 3 Mar 2015 19:04:29 -0800 Subject: [PATCH] add more tests for dial timeout and fix some bugs --- clientconn.go | 44 +++++++++++++++++++++++++++++++------------- test/end2end_test.go | 39 ++++++++++++++++++++++++++++++++++++++- 2 files changed, 69 insertions(+), 14 deletions(-) diff --git a/clientconn.go b/clientconn.go index 596bb7044..473837965 100644 --- a/clientconn.go +++ b/clientconn.go @@ -52,7 +52,7 @@ var ( // the session is closing. ErrClientConnClosing = errors.New("grpc: the client connection is closing") // ErrClientConnTimeout indicates that the connection could not be - // established within the specified timeout. + // established or re-established within the specified timeout. ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ) @@ -112,7 +112,8 @@ type ClientConn struct { shutdownChan chan struct{} mu sync.Mutex - // Is closed and becomes nil when a new transport is up. + // ready is closed and becomes nil when a new transport is up or failed + // due to timeout. ready chan struct{} // Indicates the ClientConn is under destruction. closing bool @@ -141,19 +142,28 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error { t.Close() } // Adjust timeout for the current try. - if cc.dopts.Timeout > 0 { - cc.dopts.Timeout -= time.Since(start) - if cc.dopts.Timeout <= 0 { + dopts := cc.dopts + if dopts.Timeout > 0 { + dopts.Timeout -= time.Since(start) + if dopts.Timeout <= 0 { + cc.Close() return ErrClientConnTimeout } } - newTransport, err := transport.NewClientTransport(cc.target, cc.dopts) + newTransport, err := transport.NewClientTransport(cc.target, dopts) if err != nil { if netErr, ok := err.(net.Error); ok && netErr.Timeout() { + cc.Close() + return ErrClientConnTimeout + } + sleepTime := backoff(retries) + // Fail early before falling into sleep. + if dopts.Timeout > 0 && dopts.Timeout < sleepTime + time.Since(start) { + cc.Close() return ErrClientConnTimeout } closeTransport = false - time.Sleep(backoff(retries)) + time.Sleep(sleepTime) retries++ // TODO(zhaoq): Record the error with glog.V. log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target) @@ -183,6 +193,8 @@ func (cc *ClientConn) transportMonitor() { case <-cc.transport.Error(): if err := cc.resetTransport(true); err != nil { // The channel is closing. + // TODO(zhaoq): Record the error with glog.V. + log.Printf("grpc: transport exits due to %v", err) return } continue @@ -214,24 +226,30 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo select { case <-ctx.Done(): return nil, 0, transport.ContextErr(ctx.Err()) - // Wait until the new transport is ready. + // Wait until the new transport is ready or failed. case <-ready: } } } } -// Close starts to tear down the ClientConn. +// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if +// it has been closed (mostly due to dial time-out). // TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in // some edge cases (e.g., the caller opens and closes many ClientConn's in a // tight loop. -func (cc *ClientConn) Close() { +func (cc *ClientConn) Close() error { cc.mu.Lock() defer cc.mu.Unlock() if cc.closing { - return + return ErrClientConnClosing } cc.closing = true - cc.transport.Close() - close(cc.shutdownChan) + if cc.transport != nil { + cc.transport.Close() + } + if cc.shutdownChan != nil { + close(cc.shutdownChan) + } + return nil } diff --git a/test/end2end_test.go b/test/end2end_test.go index 7e59e140c..5d9000ccc 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -203,6 +203,43 @@ func TestDialTimeout(t *testing.T) { } } +func TestTLSDialTimeout(t *testing.T) { + creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com") + if err != nil { + t.Fatalf("Failed to create credentials %v", err) + } + conn, err := grpc.Dial("Non-Existent.Server:80", grpc.WithTransportCredentials(creds), grpc.WithTimeout(time.Millisecond)) + if err == nil { + conn.Close() + } + if err != grpc.ErrClientConnTimeout { + t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, grpc.ErrClientConnTimeout) + } +} + +func TestReconnectTimeout(t *testing.T) { + lis, err := net.Listen("tcp", ":0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + _, port, err := net.SplitHostPort(lis.Addr().String()) + if err != nil { + t.Fatalf("Failed to parse listener address: %v", err) + } + addr := "localhost:" + port + timeOut := time.Second + conn, err := grpc.Dial(addr, grpc.WithTimeout(timeOut)) + if err != nil { + t.Fatalf("Failed to dial to the server %q: %v", addr, err) + } + lis.Close() + // Sleep till reconnect times out. + time.Sleep(2 * timeOut) + if err := conn.Close(); err != grpc.ErrClientConnClosing { + t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing) + } +} + func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestServiceClient) { lis, err := net.Listen("tcp", ":0") if err != nil { @@ -341,7 +378,7 @@ func TestRetry(t *testing.T) { } // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. -func TestTimeout(t *testing.T) { +func TestRPCTimeout(t *testing.T) { s, tc := setUp(true, math.MaxUint32) defer s.Stop() argSize := 2718