add more tests for dial timeout and fix some bugs

This commit is contained in:
iamqizhao 2015-03-03 19:04:29 -08:00
parent fa215c2d4b
commit 351e2d0297
2 changed files with 69 additions and 14 deletions

View File

@ -52,7 +52,7 @@ var (
// the session is closing.
ErrClientConnClosing = errors.New("grpc: the client connection is closing")
// ErrClientConnTimeout indicates that the connection could not be
// established within the specified timeout.
// established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
)
@ -112,7 +112,8 @@ type ClientConn struct {
shutdownChan chan struct{}
mu sync.Mutex
// Is closed and becomes nil when a new transport is up.
// ready is closed and becomes nil when a new transport is up or failed
// due to timeout.
ready chan struct{}
// Indicates the ClientConn is under destruction.
closing bool
@ -141,19 +142,28 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
t.Close()
}
// Adjust timeout for the current try.
if cc.dopts.Timeout > 0 {
cc.dopts.Timeout -= time.Since(start)
if cc.dopts.Timeout <= 0 {
dopts := cc.dopts
if dopts.Timeout > 0 {
dopts.Timeout -= time.Since(start)
if dopts.Timeout <= 0 {
cc.Close()
return ErrClientConnTimeout
}
}
newTransport, err := transport.NewClientTransport(cc.target, cc.dopts)
newTransport, err := transport.NewClientTransport(cc.target, dopts)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
cc.Close()
return ErrClientConnTimeout
}
sleepTime := backoff(retries)
// Fail early before falling into sleep.
if dopts.Timeout > 0 && dopts.Timeout < sleepTime + time.Since(start) {
cc.Close()
return ErrClientConnTimeout
}
closeTransport = false
time.Sleep(backoff(retries))
time.Sleep(sleepTime)
retries++
// TODO(zhaoq): Record the error with glog.V.
log.Printf("grpc: ClientConn.resetTransport failed to create client transport: %v; Reconnecting to %q", err, cc.target)
@ -183,6 +193,8 @@ func (cc *ClientConn) transportMonitor() {
case <-cc.transport.Error():
if err := cc.resetTransport(true); err != nil {
// The channel is closing.
// TODO(zhaoq): Record the error with glog.V.
log.Printf("grpc: transport exits due to %v", err)
return
}
continue
@ -214,24 +226,30 @@ func (cc *ClientConn) wait(ctx context.Context, ts int) (transport.ClientTranspo
select {
case <-ctx.Done():
return nil, 0, transport.ContextErr(ctx.Err())
// Wait until the new transport is ready.
// Wait until the new transport is ready or failed.
case <-ready:
}
}
}
}
// Close starts to tear down the ClientConn.
// Close starts to tear down the ClientConn. Returns ErrClientConnClosing if
// it has been closed (mostly due to dial time-out).
// TODO(zhaoq): Make this synchronous to avoid unbounded memory consumption in
// some edge cases (e.g., the caller opens and closes many ClientConn's in a
// tight loop.
func (cc *ClientConn) Close() {
func (cc *ClientConn) Close() error {
cc.mu.Lock()
defer cc.mu.Unlock()
if cc.closing {
return
return ErrClientConnClosing
}
cc.closing = true
cc.transport.Close()
close(cc.shutdownChan)
if cc.transport != nil {
cc.transport.Close()
}
if cc.shutdownChan != nil {
close(cc.shutdownChan)
}
return nil
}

View File

@ -203,6 +203,43 @@ func TestDialTimeout(t *testing.T) {
}
}
func TestTLSDialTimeout(t *testing.T) {
creds, err := credentials.NewClientTLSFromFile(tlsDir+"ca.pem", "x.test.youtube.com")
if err != nil {
t.Fatalf("Failed to create credentials %v", err)
}
conn, err := grpc.Dial("Non-Existent.Server:80", grpc.WithTransportCredentials(creds), grpc.WithTimeout(time.Millisecond))
if err == nil {
conn.Close()
}
if err != grpc.ErrClientConnTimeout {
t.Fatalf("grpc.Dial(_, _) = %v, %v, want %v", conn, err, grpc.ErrClientConnTimeout)
}
}
func TestReconnectTimeout(t *testing.T) {
lis, err := net.Listen("tcp", ":0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
}
_, port, err := net.SplitHostPort(lis.Addr().String())
if err != nil {
t.Fatalf("Failed to parse listener address: %v", err)
}
addr := "localhost:" + port
timeOut := time.Second
conn, err := grpc.Dial(addr, grpc.WithTimeout(timeOut))
if err != nil {
t.Fatalf("Failed to dial to the server %q: %v", addr, err)
}
lis.Close()
// Sleep till reconnect times out.
time.Sleep(2 * timeOut)
if err := conn.Close(); err != grpc.ErrClientConnClosing {
t.Fatalf("%v.Close() = %v, want %v", conn, err, grpc.ErrClientConnClosing)
}
}
func setUp(useTLS bool, maxStream uint32) (s *grpc.Server, tc testpb.TestServiceClient) {
lis, err := net.Listen("tcp", ":0")
if err != nil {
@ -341,7 +378,7 @@ func TestRetry(t *testing.T) {
}
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
func TestTimeout(t *testing.T) {
func TestRPCTimeout(t *testing.T) {
s, tc := setUp(true, math.MaxUint32)
defer s.Stop()
argSize := 2718