From 35c3afad17f705e725ca04bff658932689920154 Mon Sep 17 00:00:00 2001 From: Jean de Klerk Date: Thu, 20 Sep 2018 15:45:40 -0700 Subject: [PATCH] Transport refactor (#2305) internal: remove transportMonitor, replace with callbacks This refactors the internal http2 transport to use callbacks instead of continuously monitoring the transport in a separate goroutine. This has several advantages: - Less goroutines. - Less complexity: synchronous callbacks are much easier to reason to reason about than asynchronous monitoring goroutines. - Callbacks: these provide definitive locations for monitoring the creation and closure of a transport, paving the way for GracefulStop. This CL also consolidates all the logic about backoff and iterating through the list of addresses into a single method. --- Makefile | 4 +- clientconn.go | 535 ++++++++++++++------------- internal/transport/http2_client.go | 33 +- internal/transport/transport.go | 4 +- internal/transport/transport_test.go | 62 ++-- status/status.go | 8 +- 6 files changed, 343 insertions(+), 303 deletions(-) diff --git a/Makefile b/Makefile index eb2d2a7cf..41a754f97 100644 --- a/Makefile +++ b/Makefile @@ -17,10 +17,10 @@ proto: go generate google.golang.org/grpc/... test: testdeps - go test -cpu 1,4 -timeout 5m google.golang.org/grpc/... + go test -cpu 1,4 -timeout 7m google.golang.org/grpc/... testappengine: testappenginedeps - goapp test -cpu 1,4 -timeout 5m google.golang.org/grpc/... + goapp test -cpu 1,4 -timeout 7m google.golang.org/grpc/... testappenginedeps: goapp get -d -v -t -tags 'appengine appenginevm' google.golang.org/grpc/... diff --git a/clientconn.go b/clientconn.go index c4e896f82..13d21c8af 100644 --- a/clientconn.go +++ b/clientconn.go @@ -562,11 +562,12 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi // Caller needs to make sure len(addrs) > 0. func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { ac := &addrConn{ - cc: cc, - addrs: addrs, - dopts: cc.dopts, - czData: new(channelzData), - resetBackoff: make(chan struct{}), + cc: cc, + addrs: addrs, + dopts: cc.dopts, + czData: new(channelzData), + successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1 + resetBackoff: make(chan struct{}), } ac.ctx, ac.cancel = context.WithCancel(cc.ctx) // Track ac in cc. This needs to be done before any getTransport(...) is called. @@ -654,17 +655,7 @@ func (ac *addrConn) connect() error { ac.mu.Unlock() // Start a goroutine connecting to the server asynchronously. - go func() { - if err := ac.resetTransport(); err != nil { - grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } - ac.transportMonitor() - }() + go ac.resetTransport(false) return nil } @@ -693,7 +684,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool { grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound) if curAddrFound { ac.addrs = addrs - ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list. + ac.addrIdx = 0 // Start reconnecting from beginning in the new list. } return curAddrFound @@ -867,27 +858,28 @@ type addrConn struct { cancel context.CancelFunc cc *ClientConn - addrs []resolver.Address dopts dialOptions events trace.EventLog acbw balancer.SubConn - mu sync.Mutex - curAddr resolver.Address - reconnectIdx int // The index in addrs list to start reconnecting from. + transport transport.ClientTransport // The current transport. + + mu sync.Mutex + addrIdx int // The index in addrs list to start reconnecting from. + curAddr resolver.Address // The current address. + addrs []resolver.Address // All addresses that the resolver resolved to. + // Use updateConnectivityState for updating addrConn's connectivity state. state connectivity.State // ready is closed and becomes nil when a new transport is up or failed // due to timeout. - ready chan struct{} - transport transport.ClientTransport + ready chan struct{} - // The reason this addrConn is torn down. - tearDownErr error + tearDownErr error // The reason this addrConn is torn down. - connectRetryNum int + backoffIdx int // backoffDeadline is the time until which resetTransport needs to - // wait before increasing connectRetryNum count. + // wait before increasing backoffIdx count. backoffDeadline time.Time // connectDeadline is the time by which all connection // negotiations must complete. @@ -897,8 +889,11 @@ type addrConn struct { channelzID int64 // channelz unique identification number czData *channelzData + + successfulHandshake bool } +// Note: this requires a lock on ac.mu. func (ac *addrConn) updateConnectivityState(s connectivity.State) { ac.state = s if channelz.IsOn() { @@ -931,184 +926,280 @@ func (ac *addrConn) printf(format string, a ...interface{}) { } } -// resetTransport recreates a transport to the address for ac. The old -// transport will close itself on error or when the clientconn is closed. -// The created transport must receive initial settings frame from the server. -// In case that doesn't happen, transportMonitor will kill the newly created -// transport after connectDeadline has expired. -// In case there was an error on the transport before the settings frame was -// received, resetTransport resumes connecting to backends after the one that -// was previously connected to. In case end of the list is reached, resetTransport -// backs off until the original deadline. -// If the DialOption WithWaitForHandshake was set, resetTrasport returns -// successfully only after server settings are received. +// errorf records an error in ac's event log, unless ac has been closed. +// REQUIRES ac.mu is held. +func (ac *addrConn) errorf(format string, a ...interface{}) { + if ac.events != nil { + ac.events.Errorf(format, a...) + } +} + +// resetTransport makes sure that a healthy ac.transport exists. // -// TODO(bar) make sure all state transitions are valid. -func (ac *addrConn) resetTransport() error { - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return errConnClosing - } - if ac.ready != nil { - close(ac.ready) - ac.ready = nil - } - ac.transport = nil - ridx := ac.reconnectIdx - ac.mu.Unlock() - ac.cc.mu.RLock() - ac.dopts.copts.KeepaliveParams = ac.cc.mkp - ac.cc.mu.RUnlock() - var backoffDeadline, connectDeadline time.Time - var resetBackoff chan struct{} - for connectRetryNum := 0; ; connectRetryNum++ { - ac.mu.Lock() - if ac.backoffDeadline.IsZero() { - // This means either a successful HTTP2 connection was established - // or this is the first time this addrConn is trying to establish a - // connection. - backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration. - resetBackoff = ac.resetBackoff - // This will be the duration that dial gets to finish. - dialDuration := getMinConnectTimeout() - if backoffFor > dialDuration { - // Give dial more time as we keep failing to connect. - dialDuration = backoffFor - } - start := time.Now() - backoffDeadline = start.Add(backoffFor) - connectDeadline = start.Add(dialDuration) - ridx = 0 // Start connecting from the beginning. - } else { - // Continue trying to connect with the same deadlines. - connectRetryNum = ac.connectRetryNum - backoffDeadline = ac.backoffDeadline - connectDeadline = ac.connectDeadline - ac.backoffDeadline = time.Time{} - ac.connectDeadline = time.Time{} - ac.connectRetryNum = 0 +// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or +// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer +// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to +// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the +// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs +// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received. +// +// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received. +func (ac *addrConn) resetTransport(resolveNow bool) { + for { + // If this is the first in a line of resets, we want to resolve immediately. The only other time we + // want to reset is if we have tried all the addresses handed to us. + if resolveNow { + ac.mu.Lock() + ac.cc.resolveNow(resolver.ResolveNowOption{}) + ac.mu.Unlock() } + + if err := ac.nextAddr(); err != nil { + grpclog.Warningf("resetTransport: error from nextAddr: %v", err) + } + + ac.mu.Lock() if ac.state == connectivity.Shutdown { ac.mu.Unlock() - return errConnClosing + return } + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + ac.transport = nil + + backoffIdx := ac.backoffIdx + backoffFor := ac.dopts.bs.Backoff(backoffIdx) + + // This will be the duration that dial gets to finish. + dialDuration := getMinConnectTimeout() + if backoffFor > dialDuration { + // Give dial more time as we keep failing to connect. + dialDuration = backoffFor + } + start := time.Now() + connectDeadline := start.Add(dialDuration) + ac.backoffDeadline = start.Add(backoffFor) + ac.connectDeadline = connectDeadline + + ac.mu.Unlock() + + ac.cc.mu.RLock() + ac.dopts.copts.KeepaliveParams = ac.cc.mkp + ac.cc.mu.RUnlock() + + ac.mu.Lock() + + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return + } + ac.printf("connecting") if ac.state != connectivity.Connecting { ac.updateConnectivityState(connectivity.Connecting) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) } - // copy ac.addrs in case of race - addrsIter := make([]resolver.Address, len(ac.addrs)) - copy(addrsIter, ac.addrs) + + addr := ac.addrs[ac.addrIdx] copts := ac.dopts.copts ac.mu.Unlock() - connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts, resetBackoff) - if err != nil { - return err - } - if connected { - return nil - } - } -} -// createTransport creates a connection to one of the backends in addrs. -// It returns true if a connection was established. -func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions, resetBackoff chan struct{}) (bool, error) { - for i := ridx; i < len(addrs); i++ { - addr := addrs[i] if channelz.IsOn() { channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{ Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr), Severity: channelz.CtINFO, }) } - target := transport.TargetInfo{ - Addr: addr.Addr, - Metadata: addr.Metadata, - Authority: ac.cc.authority, - } - done := make(chan struct{}) - onPrefaceReceipt := func() { - ac.mu.Lock() - close(done) - if !ac.backoffDeadline.IsZero() { - // If we haven't already started reconnecting to - // other backends. - // Note, this can happen when writer notices an error - // and triggers resetTransport while at the same time - // reader receives the preface and invokes this closure. - ac.backoffDeadline = time.Time{} - ac.connectDeadline = time.Time{} - ac.connectRetryNum = 0 - } - ac.mu.Unlock() - } - // Do not cancel in the success path because of - // this issue in Go1.6: https://github.com/golang/go/issues/15078. - connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) - if channelz.IsOn() { - copts.ChannelzParentID = ac.channelzID - } - newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) - if err != nil { - cancel() - ac.cc.blockingpicker.updateConnectionError(err) - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - // ac.tearDown(...) has been invoked. - ac.mu.Unlock() - return false, errConnClosing - } - ac.mu.Unlock() - grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) + + if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil { continue } + + return + } +} + +// createTransport creates a connection to one of the backends in addrs. +func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error { + oneReset := sync.Once{} + skipReset := make(chan struct{}) + allowedToReset := make(chan struct{}) + prefaceReceived := make(chan struct{}) + onCloseCalled := make(chan struct{}) + + onGoAway := func(r transport.GoAwayReason) { + ac.mu.Lock() + ac.adjustParams(r) + ac.mu.Unlock() + select { + case <-skipReset: // The outer resetTransport loop will handle reconnection. + return + case <-allowedToReset: // We're in the clear to reset. + go oneReset.Do(func() { ac.resetTransport(false) }) + } + } + + onClose := func() { + close(onCloseCalled) + + select { + case <-skipReset: // The outer resetTransport loop will handle reconnection. + return + case <-allowedToReset: // We're in the clear to reset. + ac.mu.Lock() + ac.transport = nil + ac.mu.Unlock() + oneReset.Do(func() { ac.resetTransport(false) }) + } + } + + target := transport.TargetInfo{ + Addr: addr.Addr, + Metadata: addr.Metadata, + Authority: ac.cc.authority, + } + + onPrefaceReceipt := func() { + close(prefaceReceived) + + // TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope + ac.mu.Lock() + ac.successfulHandshake = true + ac.backoffDeadline = time.Time{} + ac.connectDeadline = time.Time{} + ac.addrIdx = 0 + ac.backoffIdx = 0 + ac.mu.Unlock() + } + + // Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078. + connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + if channelz.IsOn() { + copts.ChannelzParentID = ac.channelzID + } + + newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose) + + if err == nil { + prefaceTimer := time.AfterFunc(connectDeadline.Sub(time.Now()), func() { + ac.mu.Lock() + select { + case <-onCloseCalled: + // The transport has already closed - noop. + ac.mu.Unlock() + case <-prefaceReceived: + // We got the preface just in the nick of time - huzzah! + ac.mu.Unlock() + default: + // We didn't get the preface in time. + ac.mu.Unlock() + newTr.Close() + } + }) if ac.dopts.waitForHandshake { select { - case <-done: - case <-connectCtx.Done(): - // Didn't receive server preface, must kill this new transport now. - grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.") + case <-prefaceTimer.C: + // We want to close but _not_ reset, because we're going into the transient-failure-and-return flow + // and go into the next cycle of the resetTransport loop. + close(skipReset) newTr.Close() - continue - case <-ac.ctx.Done(): + err = errors.New("timed out") + case <-prefaceReceived: + // We got the preface - huzzah! things are good. + case <-onCloseCalled: + // The transport has already closed - noop. } } + } + + if err != nil { + // newTr is either nil, or closed. + + cancel() + ac.cc.blockingpicker.updateConnectionError(err) ac.mu.Lock() if ac.state == connectivity.Shutdown { + // ac.tearDown(...) has been invoked. ac.mu.Unlock() - // ac.tearDonn(...) has been invoked. - newTr.Close() - return false, errConnClosing + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + return errConnClosing } - ac.printf("ready") - ac.updateConnectivityState(connectivity.Ready) + ac.updateConnectivityState(connectivity.TransientFailure) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.transport = newTr - ac.curAddr = addr - if ac.ready != nil { - close(ac.ready) - ac.ready = nil - } - select { - case <-done: - // If the server has responded back with preface already, - // don't set the reconnect parameters. - default: - ac.connectRetryNum = connectRetryNum - ac.backoffDeadline = backoffDeadline - ac.connectDeadline = connectDeadline - ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list. - } ac.mu.Unlock() - return true, nil + grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err) + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + return err } + ac.mu.Lock() + if ac.state == connectivity.Shutdown { ac.mu.Unlock() - return false, errConnClosing + + // We don't want to reset during this close because we prefer to kick out of this function and let the loop + // in resetTransport take care of reconnecting. + close(skipReset) + + newTr.Close() + return errConnClosing + } + + ac.printf("ready") + ac.updateConnectivityState(connectivity.Ready) + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) + ac.transport = newTr + ac.curAddr = addr + if ac.ready != nil { + close(ac.ready) + ac.ready = nil + } + + ac.mu.Unlock() + + // Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader + // goroutine failing races with all the code in this method that sets the connection to "ready". + close(allowedToReset) + return nil +} + +// nextAddr increments the addrIdx if there are more addresses to try. If +// there are no more addrs to try it will re-resolve, set addrIdx to 0, and +// increment the backoffIdx. +func (ac *addrConn) nextAddr() error { + ac.mu.Lock() + + // If a handshake has been observed, we expect the counters to have manually + // been reset so we'll just return, since we want the next usage to start + // at index 0. + if ac.successfulHandshake { + ac.successfulHandshake = false + ac.mu.Unlock() + return nil + } + + if ac.addrIdx < len(ac.addrs)-1 { + ac.addrIdx++ + ac.mu.Unlock() + return nil + } + + ac.addrIdx = 0 + ac.backoffIdx++ + + if ac.state == connectivity.Shutdown { + ac.mu.Unlock() + return errConnClosing } ac.updateConnectivityState(connectivity.TransientFailure) ac.cc.handleSubConnStateChange(ac.acbw, ac.state) @@ -1117,113 +1208,35 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, close(ac.ready) ac.ready = nil } + backoffDeadline := ac.backoffDeadline + b := ac.resetBackoff ac.mu.Unlock() timer := time.NewTimer(backoffDeadline.Sub(time.Now())) select { case <-timer.C: - case <-resetBackoff: + case <-b: timer.Stop() case <-ac.ctx.Done(): timer.Stop() - return false, ac.ctx.Err() + return ac.ctx.Err() } - return false, nil + return nil } func (ac *addrConn) resetConnectBackoff() { ac.mu.Lock() close(ac.resetBackoff) + ac.backoffIdx = 0 ac.resetBackoff = make(chan struct{}) - ac.connectRetryNum = 0 ac.mu.Unlock() } -// Run in a goroutine to track the error in transport and create the -// new transport if an error happens. It returns when the channel is closing. -func (ac *addrConn) transportMonitor() { - for { - var timer *time.Timer - var cdeadline <-chan time.Time - ac.mu.Lock() - t := ac.transport - if !ac.connectDeadline.IsZero() { - timer = time.NewTimer(ac.connectDeadline.Sub(time.Now())) - cdeadline = timer.C - } - ac.mu.Unlock() - // Block until we receive a goaway or an error occurs. - select { - case <-t.GoAway(): - done := t.Error() - cleanup := t.Close - // Since this transport will be orphaned (won't have a transportMonitor) - // we need to launch a goroutine to keep track of clientConn.Close() - // happening since it might not be noticed by any other goroutine for a while. - go func() { - <-done - cleanup() - }() - case <-t.Error(): - // In case this is triggered because clientConn.Close() - // was called, we want to immeditately close the transport - // since no other goroutine might notice it for a while. - t.Close() - case <-cdeadline: - ac.mu.Lock() - // This implies that client received server preface. - if ac.backoffDeadline.IsZero() { - ac.mu.Unlock() - continue - } - ac.mu.Unlock() - timer = nil - // No server preface received until deadline. - // Kill the connection. - grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.") - t.Close() - } - if timer != nil { - timer.Stop() - } - // If a GoAway happened, regardless of error, adjust our keepalive - // parameters as appropriate. - select { - case <-t.GoAway(): - ac.adjustParams(t.GetGoAwayReason()) - default: - } - ac.mu.Lock() - if ac.state == connectivity.Shutdown { - ac.mu.Unlock() - return - } - // Set connectivity state to TransientFailure before calling - // resetTransport. Transition READY->CONNECTING is not valid. - ac.updateConnectivityState(connectivity.TransientFailure) - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) - ac.cc.resolveNow(resolver.ResolveNowOption{}) - ac.curAddr = resolver.Address{} - ac.mu.Unlock() - if err := ac.resetTransport(); err != nil { - ac.mu.Lock() - ac.printf("transport exiting: %v", err) - ac.mu.Unlock() - grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err) - if err != errConnClosing { - // Keep this ac in cc.conns, to get the reason it's torn down. - ac.tearDown(err) - } - return - } - } -} - // getReadyTransport returns the transport if ac's state is READY. // Otherwise it returns nil, false. // If ac's state is IDLE, it will trigger ac to connect. func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { ac.mu.Lock() - if ac.state == connectivity.Ready { + if ac.state == connectivity.Ready && ac.transport != nil { t := ac.transport ac.mu.Unlock() return t, true @@ -1248,21 +1261,26 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) { func (ac *addrConn) tearDown(err error) { ac.cancel() ac.mu.Lock() - defer ac.mu.Unlock() if ac.state == connectivity.Shutdown { + ac.mu.Unlock() return } + // We have to set the state to Shutdown before we call ac.transports.GracefulClose, because it signals to + // onClose not to try reconnecting the transport. + ac.updateConnectivityState(connectivity.Shutdown) + ac.tearDownErr = err + ac.cc.handleSubConnStateChange(ac.acbw, ac.state) ac.curAddr = resolver.Address{} if err == errConnDrain && ac.transport != nil { // GracefulClose(...) may be executed multiple times when // i) receiving multiple GoAway frames from the server; or // ii) there are concurrent name resolver/Balancer triggered // address removal and GoAway. + // We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu. + ac.mu.Unlock() ac.transport.GracefulClose() + ac.mu.Lock() } - ac.updateConnectivityState(connectivity.Shutdown) - ac.tearDownErr = err - ac.cc.handleSubConnStateChange(ac.acbw, ac.state) if ac.events != nil { ac.events.Finish() ac.events = nil @@ -1284,6 +1302,7 @@ func (ac *addrConn) tearDown(err error) { // the entity beng deleted, and thus prevent it from being deleted right away. channelz.RemoveEntry(ac.channelzID) } + ac.mu.Unlock() } func (ac *addrConn) getState() connectivity.State { diff --git a/internal/transport/http2_client.go b/internal/transport/http2_client.go index 904e790c4..0787d6237 100644 --- a/internal/transport/http2_client.go +++ b/internal/transport/http2_client.go @@ -112,6 +112,9 @@ type http2Client struct { // Fields below are for channelz metric collection. channelzID int64 // channelz unique identification number czData *channelzData + + onGoAway func(GoAwayReason) + onClose func() } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -140,7 +143,7 @@ func isTemporary(err error) bool { // newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2 // and starts to receive messages on it. Non-nil error returns if construction // fails. -func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) { +func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) { scheme := "http" ctx, cancel := context.WithCancel(ctx) defer func() { @@ -223,6 +226,8 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne streamQuota: defaultMaxStreamsClient, streamsQuotaAvailable: make(chan struct{}, 1), czData: new(channelzData), + onGoAway: onGoAway, + onClose: func() {}, } t.controlBuf = newControlBuffer(t.ctxDone) if opts.InitialWindowSize >= defaultWindowSize { @@ -255,10 +260,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne t.keepaliveEnabled = true go t.keepalive() } - // Start the reader goroutine for incoming message. Each transport has - // a dedicated goroutine which reads HTTP2 frame from network. Then it - // dispatches the frame to the corresponding stream entity. - go t.reader() // Send connection preface to server. n, err := t.conn.Write(clientPreface) if err != nil { @@ -295,6 +296,15 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err) } } + + // We only assign onClose after we're sure there can not be any more t.Close calls in this goroutine, because + // onClose may (frequently does) block. + t.onClose = onClose + + // Start the reader goroutine for incoming message. Each transport has + // a dedicated goroutine which reads HTTP2 frame from network. Then it + // dispatches the frame to the corresponding stream entity. + go t.reader() t.framer.writer.Flush() go func() { t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst) @@ -720,6 +730,10 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2. // Close kicks off the shutdown process of the transport. This should be called // only once on a transport. Once it is called, the transport should not be // accessed any more. +// +// This method blocks until the addrConn that initiated this transport is +// re-connected. This happens because t.onClose() begins reconnect logic at the +// addrConn level and blocks until the addrConn is successfully connected. func (t *http2Client) Close() error { t.mu.Lock() // Make sure we only Close once. @@ -747,6 +761,7 @@ func (t *http2Client) Close() error { } t.statsHandler.HandleConn(t.ctx, connEnd) } + t.onClose() return err } @@ -1043,6 +1058,9 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { close(t.goAway) t.state = draining t.controlBuf.put(&incomingGoAway{}) + + // This has to be a new goroutine because we're still using the current goroutine to read in the transport. + t.onGoAway(t.goAwayReason) } // All streams with IDs greater than the GoAwayId // and smaller than the previous GoAway ID should be killed. @@ -1159,15 +1177,16 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.fr.ReadFrame() if err != nil { - t.Close() + t.Close() // this kicks off resetTransport, so must be last before return return } + t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!) if t.keepaliveEnabled { atomic.CompareAndSwapUint32(&t.activity, 0, 1) } sf, ok := frame.(*http2.SettingsFrame) if !ok { - t.Close() + t.Close() // this kicks off resetTransport, so must be last before return return } t.onSuccess() diff --git a/internal/transport/transport.go b/internal/transport/transport.go index fdf8ad684..2608c5e5c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -494,8 +494,8 @@ type TargetInfo struct { // NewClientTransport establishes the transport with the required ConnectOptions // and returns it to the caller. -func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) { - return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess) +func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) { + return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose) } // Options provides additional hints and information for message diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index 109b37bc2..a5a59d567 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -397,17 +397,17 @@ func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hTyp } func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) { - return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}, func() {}) + return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}) } -func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, *http2Client, func()) { +func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) { server := setUpServerOnly(t, port, serverConfig, ht) addr := "localhost:" + server.port target := TargetInfo{ Addr: addr, } connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, onHandshake) + ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, func() {}, func(GoAwayReason) {}, func() {}) if connErr != nil { cancel() // Do not cancel in success path. t.Fatalf("failed to create transport: %v", connErr) @@ -432,7 +432,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con done <- conn }() connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}) + tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { cancel() // Do not cancel in success path. // Server clean-up. @@ -449,7 +449,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con // sends status error to concurrent stream reader. func TestInflightStreamClosing(t *testing.T) { serverConfig := &ServerConfig{} - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -492,7 +492,7 @@ func TestMaxConnectionIdle(t *testing.T) { MaxConnectionIdle: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -520,7 +520,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) { MaxConnectionIdle: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -547,7 +547,7 @@ func TestMaxConnectionAge(t *testing.T) { MaxConnectionAge: 2 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -580,7 +580,7 @@ func TestKeepaliveServer(t *testing.T) { Timeout: 1 * time.Second, }, } - server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer c.Close() @@ -624,7 +624,7 @@ func TestKeepaliveServerNegative(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -718,7 +718,7 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. - }}, func() {}) + }}) defer cancel() defer s.stop() defer tr.Close() @@ -745,7 +745,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) { PermitWithoutStream: true, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -779,7 +779,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -818,7 +818,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) { PermitWithoutStream: true, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -845,7 +845,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { Timeout: 1 * time.Second, }, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions) defer cancel() defer server.stop() defer client.Close() @@ -994,7 +994,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co, func() {}) + server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co) defer cancel() defer server.stop() defer ct.Close() @@ -1083,6 +1083,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) { func TestGracefulClose(t *testing.T) { server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong) + defer cancel() defer func() { // Stop the server's listener to make the server's goroutines terminate // (after the last active stream is done). @@ -1092,7 +1093,6 @@ func TestGracefulClose(t *testing.T) { leakcheck.Check(t) // Correctly clean up the server server.stop() - cancel() }() ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10)) defer cancel() @@ -1147,8 +1147,8 @@ func TestGracefulClose(t *testing.T) { } func TestLargeMessageSuspension(t *testing.T) { - server, ct, cancelsvr := setUp(t, 0, math.MaxUint32, suspended) - defer cancelsvr() + server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended) + defer cancel() callHdr := &CallHdr{ Host: "localhost", Method: "foo.Large", @@ -1185,7 +1185,7 @@ func TestMaxStreams(t *testing.T) { serverConfig := &ServerConfig{ MaxStreams: 1, } - server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer ct.Close() defer server.stop() @@ -1319,7 +1319,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {}) + server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions) defer cancel() defer server.stop() defer client.Close() @@ -1406,7 +1406,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { InitialWindowSize: defaultWindowSize, InitialConnWindowSize: defaultWindowSize, } - server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) + server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer cancel() defer server.stop() defer client.Close() @@ -1636,7 +1636,7 @@ func TestClientWithMisbehavedServer(t *testing.T) { }() connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) defer cancel() - ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) + ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { t.Fatalf("Error while creating client transport: %v", err) } @@ -1792,7 +1792,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig) InitialWindowSize: wc.clientStream, InitialConnWindowSize: wc.clientConn, } - server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co, func() {}) + server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co) defer cancel() defer server.stop() defer client.Close() @@ -2024,7 +2024,7 @@ func (s *httpServer) cleanUp() { } } -func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func(), cancel func()) { +func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) { var ( err error lis net.Listener @@ -2056,13 +2056,13 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream wh: wh, } server.start(t, lis) - var connectCtx context.Context - connectCtx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) - client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}) + connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) + client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {}) if err != nil { cancel() // Do not cancel in success path. t.Fatalf("Error creating client. Err: %v", err) } + defer cancel() stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method"}) if err != nil { t.Fatalf("Error creating stream at client-side. Err: %v", err) @@ -2077,8 +2077,7 @@ func TestHTTPToGRPCStatusMapping(t *testing.T) { } func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) { - stream, cleanUp, cancel := setUpHTTPStatusTest(t, httpStatus, wh) - defer cancel() + stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh) defer cleanUp() want := httpStatusConvTab[httpStatus] buf := make([]byte, 8) @@ -2096,8 +2095,7 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) } func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) { - stream, cleanUp, cancel := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader) - defer cancel() + stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader) defer cleanUp() buf := make([]byte, 8) _, err := stream.Read(buf) diff --git a/status/status.go b/status/status.go index 9c61b0945..897321bab 100644 --- a/status/status.go +++ b/status/status.go @@ -126,7 +126,9 @@ func FromError(err error) (s *Status, ok bool) { if err == nil { return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true } - if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { return se.GRPCStatus(), true } return New(codes.Unknown, err.Error()), false @@ -182,7 +184,9 @@ func Code(err error) codes.Code { if err == nil { return codes.OK } - if se, ok := err.(interface{ GRPCStatus() *Status }); ok { + if se, ok := err.(interface { + GRPCStatus() *Status + }); ok { return se.GRPCStatus().Code() } return codes.Unknown