mirror of https://github.com/grpc/grpc-go.git
Revert "internal: remove transportMonitor, replace with callbacks" (#2252)
Reverts grpc/grpc-go#2219 because of #2251
This commit is contained in:
parent
97da9e087c
commit
b20cbb449d
507
clientconn.go
507
clientconn.go
|
@ -532,10 +532,9 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
|
||||||
// Caller needs to make sure len(addrs) > 0.
|
// Caller needs to make sure len(addrs) > 0.
|
||||||
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) {
|
||||||
ac := &addrConn{
|
ac := &addrConn{
|
||||||
cc: cc,
|
cc: cc,
|
||||||
addrs: addrs,
|
addrs: addrs,
|
||||||
dopts: cc.dopts,
|
dopts: cc.dopts,
|
||||||
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
|
|
||||||
}
|
}
|
||||||
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
|
||||||
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
// Track ac in cc. This needs to be done before any getTransport(...) is called.
|
||||||
|
@ -627,7 +626,17 @@ func (ac *addrConn) connect() error {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
|
||||||
// Start a goroutine connecting to the server asynchronously.
|
// Start a goroutine connecting to the server asynchronously.
|
||||||
go ac.resetTransport(false)
|
go func() {
|
||||||
|
if err := ac.resetTransport(); err != nil {
|
||||||
|
grpclog.Warningf("Failed to dial %s: %v; please retry.", ac.addrs[0].Addr, err)
|
||||||
|
if err != errConnClosing {
|
||||||
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
|
ac.tearDown(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ac.transportMonitor()
|
||||||
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -656,7 +665,7 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
|
||||||
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
|
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
|
||||||
if curAddrFound {
|
if curAddrFound {
|
||||||
ac.addrs = addrs
|
ac.addrs = addrs
|
||||||
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
|
ac.reconnectIdx = 0 // Start reconnecting from beginning in the new list.
|
||||||
}
|
}
|
||||||
|
|
||||||
return curAddrFound
|
return curAddrFound
|
||||||
|
@ -787,28 +796,27 @@ type addrConn struct {
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
cancel context.CancelFunc
|
cancel context.CancelFunc
|
||||||
|
|
||||||
mu sync.Mutex
|
|
||||||
cc *ClientConn
|
cc *ClientConn
|
||||||
|
addrs []resolver.Address
|
||||||
dopts dialOptions
|
dopts dialOptions
|
||||||
events trace.EventLog
|
events trace.EventLog
|
||||||
acbw balancer.SubConn
|
acbw balancer.SubConn
|
||||||
|
|
||||||
transport transport.ClientTransport // The current transport.
|
mu sync.Mutex
|
||||||
|
curAddr resolver.Address
|
||||||
addrIdx int // The index in addrs list to start reconnecting from.
|
reconnectIdx int // The index in addrs list to start reconnecting from.
|
||||||
curAddr resolver.Address // The current address.
|
state connectivity.State
|
||||||
addrs []resolver.Address // All addresses that the resolver resolved to.
|
|
||||||
|
|
||||||
state connectivity.State
|
|
||||||
// ready is closed and becomes nil when a new transport is up or failed
|
// ready is closed and becomes nil when a new transport is up or failed
|
||||||
// due to timeout.
|
// due to timeout.
|
||||||
ready chan struct{}
|
ready chan struct{}
|
||||||
|
transport transport.ClientTransport
|
||||||
|
|
||||||
tearDownErr error // The reason this addrConn is torn down.
|
// The reason this addrConn is torn down.
|
||||||
|
tearDownErr error
|
||||||
|
|
||||||
backoffIdx int
|
connectRetryNum int
|
||||||
// backoffDeadline is the time until which resetTransport needs to
|
// backoffDeadline is the time until which resetTransport needs to
|
||||||
// wait before increasing backoffIdx count.
|
// wait before increasing connectRetryNum count.
|
||||||
backoffDeadline time.Time
|
backoffDeadline time.Time
|
||||||
// connectDeadline is the time by which all connection
|
// connectDeadline is the time by which all connection
|
||||||
// negotiations must complete.
|
// negotiations must complete.
|
||||||
|
@ -820,8 +828,6 @@ type addrConn struct {
|
||||||
callsSucceeded int64
|
callsSucceeded int64
|
||||||
callsFailed int64
|
callsFailed int64
|
||||||
lastCallStartedTime time.Time
|
lastCallStartedTime time.Time
|
||||||
|
|
||||||
successfulHandshake bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// adjustParams updates parameters used to create transports upon
|
// adjustParams updates parameters used to create transports upon
|
||||||
|
@ -854,268 +860,176 @@ func (ac *addrConn) errorf(format string, a ...interface{}) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// resetTransport makes sure that a healthy ac.transport exists.
|
// resetTransport recreates a transport to the address for ac. The old
|
||||||
|
// transport will close itself on error or when the clientconn is closed.
|
||||||
|
// The created transport must receive initial settings frame from the server.
|
||||||
|
// In case that doesn't happen, transportMonitor will kill the newly created
|
||||||
|
// transport after connectDeadline has expired.
|
||||||
|
// In case there was an error on the transport before the settings frame was
|
||||||
|
// received, resetTransport resumes connecting to backends after the one that
|
||||||
|
// was previously connected to. In case end of the list is reached, resetTransport
|
||||||
|
// backs off until the original deadline.
|
||||||
|
// If the DialOption WithWaitForHandshake was set, resetTrasport returns
|
||||||
|
// successfully only after server settings are received.
|
||||||
//
|
//
|
||||||
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
|
// TODO(bar) make sure all state transitions are valid.
|
||||||
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
|
func (ac *addrConn) resetTransport() error {
|
||||||
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
|
ac.mu.Lock()
|
||||||
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
|
if ac.state == connectivity.Shutdown {
|
||||||
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
|
|
||||||
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
|
|
||||||
//
|
|
||||||
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
|
|
||||||
func (ac *addrConn) resetTransport(resolveNow bool) {
|
|
||||||
for {
|
|
||||||
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
|
|
||||||
// want to reset is if we have tried all the addresses handed to us.
|
|
||||||
if resolveNow {
|
|
||||||
ac.mu.Lock()
|
|
||||||
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
|
||||||
ac.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ac.nextAddr(); err != nil {
|
|
||||||
grpclog.Warningf("resetTransport: error from nextAddr: %v", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
ac.mu.Lock()
|
|
||||||
if ac.state == connectivity.Shutdown {
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
if ac.ready != nil {
|
|
||||||
close(ac.ready)
|
|
||||||
ac.ready = nil
|
|
||||||
}
|
|
||||||
ac.transport = nil
|
|
||||||
|
|
||||||
backoffIdx := ac.backoffIdx
|
|
||||||
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
|
|
||||||
|
|
||||||
// This will be the duration that dial gets to finish.
|
|
||||||
dialDuration := getMinConnectTimeout()
|
|
||||||
if backoffFor > dialDuration {
|
|
||||||
// Give dial more time as we keep failing to connect.
|
|
||||||
dialDuration = backoffFor
|
|
||||||
}
|
|
||||||
start := time.Now()
|
|
||||||
connectDeadline := start.Add(dialDuration)
|
|
||||||
ac.backoffDeadline = start.Add(backoffFor)
|
|
||||||
ac.connectDeadline = connectDeadline
|
|
||||||
|
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
return errConnClosing
|
||||||
ac.cc.mu.RLock()
|
}
|
||||||
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
if ac.ready != nil {
|
||||||
ac.cc.mu.RUnlock()
|
close(ac.ready)
|
||||||
|
ac.ready = nil
|
||||||
|
}
|
||||||
|
ac.transport = nil
|
||||||
|
ridx := ac.reconnectIdx
|
||||||
|
ac.mu.Unlock()
|
||||||
|
ac.cc.mu.RLock()
|
||||||
|
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
|
||||||
|
ac.cc.mu.RUnlock()
|
||||||
|
var backoffDeadline, connectDeadline time.Time
|
||||||
|
for connectRetryNum := 0; ; connectRetryNum++ {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
|
if ac.backoffDeadline.IsZero() {
|
||||||
|
// This means either a successful HTTP2 connection was established
|
||||||
|
// or this is the first time this addrConn is trying to establish a
|
||||||
|
// connection.
|
||||||
|
backoffFor := ac.dopts.bs.Backoff(connectRetryNum) // time.Duration.
|
||||||
|
// This will be the duration that dial gets to finish.
|
||||||
|
dialDuration := getMinConnectTimeout()
|
||||||
|
if backoffFor > dialDuration {
|
||||||
|
// Give dial more time as we keep failing to connect.
|
||||||
|
dialDuration = backoffFor
|
||||||
|
}
|
||||||
|
start := time.Now()
|
||||||
|
backoffDeadline = start.Add(backoffFor)
|
||||||
|
connectDeadline = start.Add(dialDuration)
|
||||||
|
ridx = 0 // Start connecting from the beginning.
|
||||||
|
} else {
|
||||||
|
// Continue trying to connect with the same deadlines.
|
||||||
|
connectRetryNum = ac.connectRetryNum
|
||||||
|
backoffDeadline = ac.backoffDeadline
|
||||||
|
connectDeadline = ac.connectDeadline
|
||||||
|
ac.backoffDeadline = time.Time{}
|
||||||
|
ac.connectDeadline = time.Time{}
|
||||||
|
ac.connectRetryNum = 0
|
||||||
|
}
|
||||||
if ac.state == connectivity.Shutdown {
|
if ac.state == connectivity.Shutdown {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return
|
return errConnClosing
|
||||||
}
|
}
|
||||||
|
|
||||||
ac.printf("connecting")
|
ac.printf("connecting")
|
||||||
if ac.state != connectivity.Connecting {
|
if ac.state != connectivity.Connecting {
|
||||||
ac.state = connectivity.Connecting
|
ac.state = connectivity.Connecting
|
||||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||||
}
|
}
|
||||||
|
// copy ac.addrs in case of race
|
||||||
addr := ac.addrs[ac.addrIdx]
|
addrsIter := make([]resolver.Address, len(ac.addrs))
|
||||||
|
copy(addrsIter, ac.addrs)
|
||||||
copts := ac.dopts.copts
|
copts := ac.dopts.copts
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
connected, err := ac.createTransport(connectRetryNum, ridx, backoffDeadline, connectDeadline, addrsIter, copts)
|
||||||
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
|
if err != nil {
|
||||||
// errReadTimeOut indicates that the handshake was not received before
|
return err
|
||||||
// the deadline. We exit here because the transport's reader goroutine will
|
}
|
||||||
// use onClose to reset the transport.
|
if connected {
|
||||||
if err == errReadTimedOut {
|
return nil
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var errReadTimedOut = errors.New("read timed out")
|
|
||||||
|
|
||||||
// createTransport creates a connection to one of the backends in addrs.
|
// createTransport creates a connection to one of the backends in addrs.
|
||||||
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
|
// It returns true if a connection was established.
|
||||||
skipReset := make(chan struct{})
|
func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, connectDeadline time.Time, addrs []resolver.Address, copts transport.ConnectOptions) (bool, error) {
|
||||||
allowedToReset := make(chan struct{})
|
for i := ridx; i < len(addrs); i++ {
|
||||||
prefaceReceived := make(chan struct{})
|
addr := addrs[i]
|
||||||
onCloseCalled := make(chan struct{})
|
target := transport.TargetInfo{
|
||||||
|
Addr: addr.Addr,
|
||||||
onGoAway := func(r transport.GoAwayReason) {
|
Metadata: addr.Metadata,
|
||||||
ac.mu.Lock()
|
Authority: ac.cc.authority,
|
||||||
ac.adjustParams(r)
|
|
||||||
ac.mu.Unlock()
|
|
||||||
go ac.resetTransport(false)
|
|
||||||
}
|
|
||||||
|
|
||||||
onClose := func() {
|
|
||||||
close(onCloseCalled)
|
|
||||||
|
|
||||||
select {
|
|
||||||
case <-skipReset: // The outer resetTransport loop will handle reconnection.
|
|
||||||
return
|
|
||||||
case <-allowedToReset: // We're in the clear to reset.
|
|
||||||
ac.mu.Lock()
|
|
||||||
ac.transport = nil
|
|
||||||
ac.mu.Unlock()
|
|
||||||
ac.resetTransport(false)
|
|
||||||
}
|
}
|
||||||
}
|
done := make(chan struct{})
|
||||||
|
onPrefaceReceipt := func() {
|
||||||
target := transport.TargetInfo{
|
|
||||||
Addr: addr.Addr,
|
|
||||||
Metadata: addr.Metadata,
|
|
||||||
Authority: ac.cc.authority,
|
|
||||||
}
|
|
||||||
|
|
||||||
onPrefaceReceipt := func() {
|
|
||||||
close(prefaceReceived)
|
|
||||||
|
|
||||||
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
|
|
||||||
ac.mu.Lock()
|
|
||||||
ac.successfulHandshake = true
|
|
||||||
ac.backoffDeadline = time.Time{}
|
|
||||||
ac.connectDeadline = time.Time{}
|
|
||||||
ac.addrIdx = 0
|
|
||||||
ac.backoffIdx = 0
|
|
||||||
ac.mu.Unlock()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Do not cancel in the success path because of this issue in Go1.6: https://github.com/golang/go/issues/15078.
|
|
||||||
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
|
||||||
if channelz.IsOn() {
|
|
||||||
copts.ChannelzParentID = ac.channelzID
|
|
||||||
}
|
|
||||||
|
|
||||||
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
|
|
||||||
|
|
||||||
if err == nil {
|
|
||||||
prefaceTimer := time.AfterFunc(connectDeadline.Sub(time.Now()), func() {
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
select {
|
close(done)
|
||||||
case <-onCloseCalled:
|
if !ac.backoffDeadline.IsZero() {
|
||||||
// The transport has already closed - noop.
|
// If we haven't already started reconnecting to
|
||||||
ac.mu.Unlock()
|
// other backends.
|
||||||
case <-prefaceReceived:
|
// Note, this can happen when writer notices an error
|
||||||
// We got the preface just in the nick of time - huzzah!
|
// and triggers resetTransport while at the same time
|
||||||
ac.mu.Unlock()
|
// reader receives the preface and invokes this closure.
|
||||||
default:
|
ac.backoffDeadline = time.Time{}
|
||||||
// We didn't get the preface in time.
|
ac.connectDeadline = time.Time{}
|
||||||
ac.mu.Unlock()
|
ac.connectRetryNum = 0
|
||||||
newTr.Close()
|
|
||||||
}
|
}
|
||||||
})
|
ac.mu.Unlock()
|
||||||
|
}
|
||||||
|
// Do not cancel in the success path because of
|
||||||
|
// this issue in Go1.6: https://github.com/golang/go/issues/15078.
|
||||||
|
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
|
||||||
|
if channelz.IsOn() {
|
||||||
|
copts.ChannelzParentID = ac.channelzID
|
||||||
|
}
|
||||||
|
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
ac.cc.blockingpicker.updateConnectionError(err)
|
||||||
|
ac.mu.Lock()
|
||||||
|
if ac.state == connectivity.Shutdown {
|
||||||
|
// ac.tearDown(...) has been invoked.
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return false, errConnClosing
|
||||||
|
}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
if ac.dopts.waitForHandshake {
|
if ac.dopts.waitForHandshake {
|
||||||
select {
|
select {
|
||||||
case <-prefaceTimer.C:
|
case <-done:
|
||||||
// We want to close but _not_ reset, because we're going into the transient-failure-and-return flow
|
case <-connectCtx.Done():
|
||||||
// and go into the next cycle of the resetTransport loop.
|
// Didn't receive server preface, must kill this new transport now.
|
||||||
close(skipReset)
|
grpclog.Warningf("grpc: addrConn.createTransport failed to receive server preface before deadline.")
|
||||||
newTr.Close()
|
newTr.Close()
|
||||||
err = errors.New("timed out")
|
continue
|
||||||
case <-prefaceReceived:
|
case <-ac.ctx.Done():
|
||||||
// We got the preface - huzzah! things are good.
|
|
||||||
case <-onCloseCalled:
|
|
||||||
// The transport has already closed - noop.
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
if err != nil {
|
|
||||||
// newTr is either nil, or closed.
|
|
||||||
|
|
||||||
cancel()
|
|
||||||
ac.cc.blockingpicker.updateConnectionError(err)
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state == connectivity.Shutdown {
|
if ac.state == connectivity.Shutdown {
|
||||||
// ac.tearDown(...) has been invoked.
|
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
// ac.tearDonn(...) has been invoked.
|
||||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
newTr.Close()
|
||||||
// in resetTransport take care of reconnecting.
|
return false, errConnClosing
|
||||||
close(skipReset)
|
|
||||||
|
|
||||||
return errConnClosing
|
|
||||||
}
|
}
|
||||||
ac.state = connectivity.TransientFailure
|
ac.printf("ready")
|
||||||
|
ac.state = connectivity.Ready
|
||||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||||
|
ac.transport = newTr
|
||||||
|
ac.curAddr = addr
|
||||||
|
if ac.ready != nil {
|
||||||
|
close(ac.ready)
|
||||||
|
ac.ready = nil
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
// If the server has responded back with preface already,
|
||||||
|
// don't set the reconnect parameters.
|
||||||
|
default:
|
||||||
|
ac.connectRetryNum = connectRetryNum
|
||||||
|
ac.backoffDeadline = backoffDeadline
|
||||||
|
ac.connectDeadline = connectDeadline
|
||||||
|
ac.reconnectIdx = i + 1 // Start reconnecting from the next backend in the list.
|
||||||
|
}
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
|
return true, nil
|
||||||
|
|
||||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
|
||||||
// in resetTransport take care of reconnecting.
|
|
||||||
close(skipReset)
|
|
||||||
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
|
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
|
|
||||||
if ac.state == connectivity.Shutdown {
|
if ac.state == connectivity.Shutdown {
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
|
return false, errConnClosing
|
||||||
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
|
|
||||||
// in resetTransport take care of reconnecting.
|
|
||||||
close(skipReset)
|
|
||||||
|
|
||||||
newTr.Close()
|
|
||||||
return errConnClosing
|
|
||||||
}
|
|
||||||
|
|
||||||
ac.printf("ready")
|
|
||||||
ac.state = connectivity.Ready
|
|
||||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
|
||||||
ac.transport = newTr
|
|
||||||
ac.curAddr = addr
|
|
||||||
if ac.ready != nil {
|
|
||||||
close(ac.ready)
|
|
||||||
ac.ready = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ac.mu.Unlock()
|
|
||||||
|
|
||||||
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
|
|
||||||
// goroutine failing races with all the code in this method that sets the connection to "ready".
|
|
||||||
close(allowedToReset)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// nextAddr increments the addrIdx if there are more addresses to try. If
|
|
||||||
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
|
|
||||||
// increment the backoffIdx.
|
|
||||||
func (ac *addrConn) nextAddr() error {
|
|
||||||
ac.mu.Lock()
|
|
||||||
|
|
||||||
// If a handshake has been observed, we expect the counters to have manually
|
|
||||||
// been reset so we'll just return, since we want the next usage to start
|
|
||||||
// at index 0.
|
|
||||||
if ac.successfulHandshake {
|
|
||||||
ac.successfulHandshake = false
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if ac.addrIdx < len(ac.addrs)-1 {
|
|
||||||
ac.addrIdx++
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
ac.addrIdx = 0
|
|
||||||
ac.backoffIdx++
|
|
||||||
|
|
||||||
if ac.state == connectivity.Shutdown {
|
|
||||||
ac.mu.Unlock()
|
|
||||||
return errConnClosing
|
|
||||||
}
|
}
|
||||||
ac.state = connectivity.TransientFailure
|
ac.state = connectivity.TransientFailure
|
||||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||||
|
@ -1124,16 +1038,95 @@ func (ac *addrConn) nextAddr() error {
|
||||||
close(ac.ready)
|
close(ac.ready)
|
||||||
ac.ready = nil
|
ac.ready = nil
|
||||||
}
|
}
|
||||||
backoffDeadline := ac.backoffDeadline
|
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
|
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
|
||||||
select {
|
select {
|
||||||
case <-timer.C:
|
case <-timer.C:
|
||||||
case <-ac.ctx.Done():
|
case <-ac.ctx.Done():
|
||||||
timer.Stop()
|
timer.Stop()
|
||||||
return ac.ctx.Err()
|
return false, ac.ctx.Err()
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Run in a goroutine to track the error in transport and create the
|
||||||
|
// new transport if an error happens. It returns when the channel is closing.
|
||||||
|
func (ac *addrConn) transportMonitor() {
|
||||||
|
for {
|
||||||
|
var timer *time.Timer
|
||||||
|
var cdeadline <-chan time.Time
|
||||||
|
ac.mu.Lock()
|
||||||
|
t := ac.transport
|
||||||
|
if !ac.connectDeadline.IsZero() {
|
||||||
|
timer = time.NewTimer(ac.connectDeadline.Sub(time.Now()))
|
||||||
|
cdeadline = timer.C
|
||||||
|
}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
// Block until we receive a goaway or an error occurs.
|
||||||
|
select {
|
||||||
|
case <-t.GoAway():
|
||||||
|
done := t.Error()
|
||||||
|
cleanup := t.Close
|
||||||
|
// Since this transport will be orphaned (won't have a transportMonitor)
|
||||||
|
// we need to launch a goroutine to keep track of clientConn.Close()
|
||||||
|
// happening since it might not be noticed by any other goroutine for a while.
|
||||||
|
go func() {
|
||||||
|
<-done
|
||||||
|
cleanup()
|
||||||
|
}()
|
||||||
|
case <-t.Error():
|
||||||
|
// In case this is triggered because clientConn.Close()
|
||||||
|
// was called, we want to immeditately close the transport
|
||||||
|
// since no other goroutine might notice it for a while.
|
||||||
|
t.Close()
|
||||||
|
case <-cdeadline:
|
||||||
|
ac.mu.Lock()
|
||||||
|
// This implies that client received server preface.
|
||||||
|
if ac.backoffDeadline.IsZero() {
|
||||||
|
ac.mu.Unlock()
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
timer = nil
|
||||||
|
// No server preface received until deadline.
|
||||||
|
// Kill the connection.
|
||||||
|
grpclog.Warningf("grpc: addrConn.transportMonitor didn't get server preface after waiting. Closing the new transport now.")
|
||||||
|
t.Close()
|
||||||
|
}
|
||||||
|
if timer != nil {
|
||||||
|
timer.Stop()
|
||||||
|
}
|
||||||
|
// If a GoAway happened, regardless of error, adjust our keepalive
|
||||||
|
// parameters as appropriate.
|
||||||
|
select {
|
||||||
|
case <-t.GoAway():
|
||||||
|
ac.adjustParams(t.GetGoAwayReason())
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
ac.mu.Lock()
|
||||||
|
if ac.state == connectivity.Shutdown {
|
||||||
|
ac.mu.Unlock()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// Set connectivity state to TransientFailure before calling
|
||||||
|
// resetTransport. Transition READY->CONNECTING is not valid.
|
||||||
|
ac.state = connectivity.TransientFailure
|
||||||
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||||
|
ac.cc.resolveNow(resolver.ResolveNowOption{})
|
||||||
|
ac.curAddr = resolver.Address{}
|
||||||
|
ac.mu.Unlock()
|
||||||
|
if err := ac.resetTransport(); err != nil {
|
||||||
|
ac.mu.Lock()
|
||||||
|
ac.printf("transport exiting: %v", err)
|
||||||
|
ac.mu.Unlock()
|
||||||
|
grpclog.Warningf("grpc: addrConn.transportMonitor exits due to: %v", err)
|
||||||
|
if err != errConnClosing {
|
||||||
|
// Keep this ac in cc.conns, to get the reason it's torn down.
|
||||||
|
ac.tearDown(err)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
}
|
}
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// getReadyTransport returns the transport if ac's state is READY.
|
// getReadyTransport returns the transport if ac's state is READY.
|
||||||
|
@ -1141,7 +1134,7 @@ func (ac *addrConn) nextAddr() error {
|
||||||
// If ac's state is IDLE, it will trigger ac to connect.
|
// If ac's state is IDLE, it will trigger ac to connect.
|
||||||
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
if ac.state == connectivity.Ready && ac.transport != nil {
|
if ac.state == connectivity.Ready {
|
||||||
t := ac.transport
|
t := ac.transport
|
||||||
ac.mu.Unlock()
|
ac.mu.Unlock()
|
||||||
return t, true
|
return t, true
|
||||||
|
@ -1166,26 +1159,21 @@ func (ac *addrConn) getReadyTransport() (transport.ClientTransport, bool) {
|
||||||
func (ac *addrConn) tearDown(err error) {
|
func (ac *addrConn) tearDown(err error) {
|
||||||
ac.cancel()
|
ac.cancel()
|
||||||
ac.mu.Lock()
|
ac.mu.Lock()
|
||||||
|
defer ac.mu.Unlock()
|
||||||
if ac.state == connectivity.Shutdown {
|
if ac.state == connectivity.Shutdown {
|
||||||
ac.mu.Unlock()
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
// We have to set the state to Shutdown before we call ac.transports.GracefulClose, because it signals to
|
|
||||||
// onClose not to try reconnecting the transport.
|
|
||||||
ac.state = connectivity.Shutdown
|
|
||||||
ac.tearDownErr = err
|
|
||||||
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
|
||||||
ac.curAddr = resolver.Address{}
|
ac.curAddr = resolver.Address{}
|
||||||
if err == errConnDrain && ac.transport != nil {
|
if err == errConnDrain && ac.transport != nil {
|
||||||
// GracefulClose(...) may be executed multiple times when
|
// GracefulClose(...) may be executed multiple times when
|
||||||
// i) receiving multiple GoAway frames from the server; or
|
// i) receiving multiple GoAway frames from the server; or
|
||||||
// ii) there are concurrent name resolver/Balancer triggered
|
// ii) there are concurrent name resolver/Balancer triggered
|
||||||
// address removal and GoAway.
|
// address removal and GoAway.
|
||||||
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
|
|
||||||
ac.mu.Unlock()
|
|
||||||
ac.transport.GracefulClose()
|
ac.transport.GracefulClose()
|
||||||
ac.mu.Lock()
|
|
||||||
}
|
}
|
||||||
|
ac.state = connectivity.Shutdown
|
||||||
|
ac.tearDownErr = err
|
||||||
|
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
|
||||||
if ac.events != nil {
|
if ac.events != nil {
|
||||||
ac.events.Finish()
|
ac.events.Finish()
|
||||||
ac.events = nil
|
ac.events = nil
|
||||||
|
@ -1197,7 +1185,6 @@ func (ac *addrConn) tearDown(err error) {
|
||||||
if channelz.IsOn() {
|
if channelz.IsOn() {
|
||||||
channelz.RemoveEntry(ac.channelzID)
|
channelz.RemoveEntry(ac.channelzID)
|
||||||
}
|
}
|
||||||
ac.mu.Unlock()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ac *addrConn) getState() connectivity.State {
|
func (ac *addrConn) getState() connectivity.State {
|
||||||
|
|
|
@ -124,9 +124,6 @@ type http2Client struct {
|
||||||
msgRecv int64
|
msgRecv int64
|
||||||
lastMsgSent time.Time
|
lastMsgSent time.Time
|
||||||
lastMsgRecv time.Time
|
lastMsgRecv time.Time
|
||||||
|
|
||||||
onGoAway func(GoAwayReason)
|
|
||||||
onClose func()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
|
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
|
||||||
|
@ -155,7 +152,7 @@ func isTemporary(err error) bool {
|
||||||
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
|
||||||
// and starts to receive messages on it. Non-nil error returns if construction
|
// and starts to receive messages on it. Non-nil error returns if construction
|
||||||
// fails.
|
// fails.
|
||||||
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
|
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func()) (_ *http2Client, err error) {
|
||||||
scheme := "http"
|
scheme := "http"
|
||||||
ctx, cancel := context.WithCancel(ctx)
|
ctx, cancel := context.WithCancel(ctx)
|
||||||
defer func() {
|
defer func() {
|
||||||
|
@ -237,8 +234,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||||
maxConcurrentStreams: defaultMaxStreamsClient,
|
maxConcurrentStreams: defaultMaxStreamsClient,
|
||||||
streamQuota: defaultMaxStreamsClient,
|
streamQuota: defaultMaxStreamsClient,
|
||||||
streamsQuotaAvailable: make(chan struct{}, 1),
|
streamsQuotaAvailable: make(chan struct{}, 1),
|
||||||
onGoAway: onGoAway,
|
|
||||||
onClose: func() {},
|
|
||||||
}
|
}
|
||||||
t.controlBuf = newControlBuffer(t.ctxDone)
|
t.controlBuf = newControlBuffer(t.ctxDone)
|
||||||
if opts.InitialWindowSize >= defaultWindowSize {
|
if opts.InitialWindowSize >= defaultWindowSize {
|
||||||
|
@ -271,6 +266,10 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||||
t.keepaliveEnabled = true
|
t.keepaliveEnabled = true
|
||||||
go t.keepalive()
|
go t.keepalive()
|
||||||
}
|
}
|
||||||
|
// Start the reader goroutine for incoming message. Each transport has
|
||||||
|
// a dedicated goroutine which reads HTTP2 frame from network. Then it
|
||||||
|
// dispatches the frame to the corresponding stream entity.
|
||||||
|
go t.reader()
|
||||||
// Send connection preface to server.
|
// Send connection preface to server.
|
||||||
n, err := t.conn.Write(clientPreface)
|
n, err := t.conn.Write(clientPreface)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -307,15 +306,6 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
|
||||||
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
return nil, connectionErrorf(true, err, "transport: failed to write window update: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// We only assign onClose after we're sure there can not be any more t.Close calls in this goroutine, because
|
|
||||||
// onClose may (frequently does) block.
|
|
||||||
t.onClose = onClose
|
|
||||||
|
|
||||||
// Start the reader goroutine for incoming message. Each transport has
|
|
||||||
// a dedicated goroutine which reads HTTP2 frame from network. Then it
|
|
||||||
// dispatches the frame to the corresponding stream entity.
|
|
||||||
go t.reader()
|
|
||||||
t.framer.writer.Flush()
|
t.framer.writer.Flush()
|
||||||
go func() {
|
go func() {
|
||||||
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
|
t.loopy = newLoopyWriter(clientSide, t.framer, t.controlBuf, t.bdpEst)
|
||||||
|
@ -745,10 +735,6 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
||||||
// Close kicks off the shutdown process of the transport. This should be called
|
// Close kicks off the shutdown process of the transport. This should be called
|
||||||
// only once on a transport. Once it is called, the transport should not be
|
// only once on a transport. Once it is called, the transport should not be
|
||||||
// accessed any more.
|
// accessed any more.
|
||||||
//
|
|
||||||
// This method blocks until the addrConn that initiated this transport is
|
|
||||||
// re-connected. This happens because t.onClose() begins reconnect logic at the
|
|
||||||
// addrConn level and blocks until the addrConn is successfully connected.
|
|
||||||
func (t *http2Client) Close() error {
|
func (t *http2Client) Close() error {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
// Make sure we only Close once.
|
// Make sure we only Close once.
|
||||||
|
@ -776,7 +762,6 @@ func (t *http2Client) Close() error {
|
||||||
}
|
}
|
||||||
t.statsHandler.HandleConn(t.ctx, connEnd)
|
t.statsHandler.HandleConn(t.ctx, connEnd)
|
||||||
}
|
}
|
||||||
t.onClose()
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1073,9 +1058,6 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||||
close(t.goAway)
|
close(t.goAway)
|
||||||
t.state = draining
|
t.state = draining
|
||||||
t.controlBuf.put(&incomingGoAway{})
|
t.controlBuf.put(&incomingGoAway{})
|
||||||
|
|
||||||
// This has to be a new goroutine because we're still using the current goroutine to read in the transport.
|
|
||||||
t.onGoAway(t.goAwayReason)
|
|
||||||
}
|
}
|
||||||
// All streams with IDs greater than the GoAwayId
|
// All streams with IDs greater than the GoAwayId
|
||||||
// and smaller than the previous GoAway ID should be killed.
|
// and smaller than the previous GoAway ID should be killed.
|
||||||
|
@ -1192,16 +1174,15 @@ func (t *http2Client) reader() {
|
||||||
// Check the validity of server preface.
|
// Check the validity of server preface.
|
||||||
frame, err := t.framer.fr.ReadFrame()
|
frame, err := t.framer.fr.ReadFrame()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Close() // this kicks off resetTransport, so must be last before return
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.conn.SetReadDeadline(time.Time{}) // reset deadline once we get the settings frame (we didn't time out, yay!)
|
|
||||||
if t.keepaliveEnabled {
|
if t.keepaliveEnabled {
|
||||||
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
|
||||||
}
|
}
|
||||||
sf, ok := frame.(*http2.SettingsFrame)
|
sf, ok := frame.(*http2.SettingsFrame)
|
||||||
if !ok {
|
if !ok {
|
||||||
t.Close() // this kicks off resetTransport, so must be last before return
|
t.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
t.onSuccess()
|
t.onSuccess()
|
||||||
|
|
|
@ -495,8 +495,8 @@ type TargetInfo struct {
|
||||||
|
|
||||||
// NewClientTransport establishes the transport with the required ConnectOptions
|
// NewClientTransport establishes the transport with the required ConnectOptions
|
||||||
// and returns it to the caller.
|
// and returns it to the caller.
|
||||||
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
|
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func()) (ClientTransport, error) {
|
||||||
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
|
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Options provides additional hints and information for message
|
// Options provides additional hints and information for message
|
||||||
|
|
|
@ -397,17 +397,17 @@ func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hTyp
|
||||||
}
|
}
|
||||||
|
|
||||||
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) {
|
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, *http2Client, func()) {
|
||||||
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
|
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}, func() {})
|
||||||
}
|
}
|
||||||
|
|
||||||
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, *http2Client, func()) {
|
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, *http2Client, func()) {
|
||||||
server := setUpServerOnly(t, port, serverConfig, ht)
|
server := setUpServerOnly(t, port, serverConfig, ht)
|
||||||
addr := "localhost:" + server.port
|
addr := "localhost:" + server.port
|
||||||
target := TargetInfo{
|
target := TargetInfo{
|
||||||
Addr: addr,
|
Addr: addr,
|
||||||
}
|
}
|
||||||
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
||||||
ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, func() {}, func(GoAwayReason) {}, func() {})
|
ct, connErr := NewClientTransport(connectCtx, context.Background(), target, copts, onHandshake)
|
||||||
if connErr != nil {
|
if connErr != nil {
|
||||||
cancel() // Do not cancel in success path.
|
cancel() // Do not cancel in success path.
|
||||||
t.Fatalf("failed to create transport: %v", connErr)
|
t.Fatalf("failed to create transport: %v", connErr)
|
||||||
|
@ -432,7 +432,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
|
||||||
done <- conn
|
done <- conn
|
||||||
}()
|
}()
|
||||||
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
||||||
tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {}, func(GoAwayReason) {}, func() {})
|
tr, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts, func() {})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel() // Do not cancel in success path.
|
cancel() // Do not cancel in success path.
|
||||||
// Server clean-up.
|
// Server clean-up.
|
||||||
|
@ -449,7 +449,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
|
||||||
// sends status error to concurrent stream reader.
|
// sends status error to concurrent stream reader.
|
||||||
func TestInflightStreamClosing(t *testing.T) {
|
func TestInflightStreamClosing(t *testing.T) {
|
||||||
serverConfig := &ServerConfig{}
|
serverConfig := &ServerConfig{}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -492,7 +492,7 @@ func TestMaxConnectionIdle(t *testing.T) {
|
||||||
MaxConnectionIdle: 2 * time.Second,
|
MaxConnectionIdle: 2 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -520,7 +520,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) {
|
||||||
MaxConnectionIdle: 2 * time.Second,
|
MaxConnectionIdle: 2 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -547,7 +547,7 @@ func TestMaxConnectionAge(t *testing.T) {
|
||||||
MaxConnectionAge: 2 * time.Second,
|
MaxConnectionAge: 2 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -580,7 +580,7 @@ func TestKeepaliveServer(t *testing.T) {
|
||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer c.Close()
|
defer c.Close()
|
||||||
|
@ -624,7 +624,7 @@ func TestKeepaliveServerNegative(t *testing.T) {
|
||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -718,7 +718,7 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
|
||||||
Time: 2 * time.Second, // Keepalive time = 2 sec.
|
Time: 2 * time.Second, // Keepalive time = 2 sec.
|
||||||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
|
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
|
||||||
PermitWithoutStream: true, // Run keepalive even with no RPCs.
|
PermitWithoutStream: true, // Run keepalive even with no RPCs.
|
||||||
}})
|
}}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer s.stop()
|
defer s.stop()
|
||||||
defer tr.Close()
|
defer tr.Close()
|
||||||
|
@ -745,7 +745,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
|
||||||
PermitWithoutStream: true,
|
PermitWithoutStream: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -779,7 +779,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
|
||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -818,7 +818,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
|
||||||
PermitWithoutStream: true,
|
PermitWithoutStream: true,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions)
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -845,7 +845,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
|
||||||
Timeout: 1 * time.Second,
|
Timeout: 1 * time.Second,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions)
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -994,7 +994,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
|
||||||
InitialWindowSize: defaultWindowSize,
|
InitialWindowSize: defaultWindowSize,
|
||||||
InitialConnWindowSize: defaultWindowSize,
|
InitialConnWindowSize: defaultWindowSize,
|
||||||
}
|
}
|
||||||
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co)
|
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer ct.Close()
|
defer ct.Close()
|
||||||
|
@ -1083,7 +1083,6 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
|
||||||
|
|
||||||
func TestGracefulClose(t *testing.T) {
|
func TestGracefulClose(t *testing.T) {
|
||||||
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
|
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
|
||||||
defer cancel()
|
|
||||||
defer func() {
|
defer func() {
|
||||||
// Stop the server's listener to make the server's goroutines terminate
|
// Stop the server's listener to make the server's goroutines terminate
|
||||||
// (after the last active stream is done).
|
// (after the last active stream is done).
|
||||||
|
@ -1093,6 +1092,7 @@ func TestGracefulClose(t *testing.T) {
|
||||||
leakcheck.Check(t)
|
leakcheck.Check(t)
|
||||||
// Correctly clean up the server
|
// Correctly clean up the server
|
||||||
server.stop()
|
server.stop()
|
||||||
|
cancel()
|
||||||
}()
|
}()
|
||||||
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
|
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
@ -1147,8 +1147,8 @@ func TestGracefulClose(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestLargeMessageSuspension(t *testing.T) {
|
func TestLargeMessageSuspension(t *testing.T) {
|
||||||
server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended)
|
server, ct, cancelsvr := setUp(t, 0, math.MaxUint32, suspended)
|
||||||
defer cancel()
|
defer cancelsvr()
|
||||||
callHdr := &CallHdr{
|
callHdr := &CallHdr{
|
||||||
Host: "localhost",
|
Host: "localhost",
|
||||||
Method: "foo.Large",
|
Method: "foo.Large",
|
||||||
|
@ -1185,7 +1185,7 @@ func TestMaxStreams(t *testing.T) {
|
||||||
serverConfig := &ServerConfig{
|
serverConfig := &ServerConfig{
|
||||||
MaxStreams: 1,
|
MaxStreams: 1,
|
||||||
}
|
}
|
||||||
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer ct.Close()
|
defer ct.Close()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
|
@ -1319,7 +1319,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
|
||||||
InitialWindowSize: defaultWindowSize,
|
InitialWindowSize: defaultWindowSize,
|
||||||
InitialConnWindowSize: defaultWindowSize,
|
InitialConnWindowSize: defaultWindowSize,
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions)
|
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -1406,7 +1406,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
|
||||||
InitialWindowSize: defaultWindowSize,
|
InitialWindowSize: defaultWindowSize,
|
||||||
InitialConnWindowSize: defaultWindowSize,
|
InitialConnWindowSize: defaultWindowSize,
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -1636,7 +1636,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
||||||
}()
|
}()
|
||||||
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
||||||
defer cancel()
|
defer cancel()
|
||||||
ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})
|
ct, err := NewClientTransport(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error while creating client transport: %v", err)
|
t.Fatalf("Error while creating client transport: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1799,7 +1799,7 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
|
||||||
InitialWindowSize: wc.clientStream,
|
InitialWindowSize: wc.clientStream,
|
||||||
InitialConnWindowSize: wc.clientConn,
|
InitialConnWindowSize: wc.clientConn,
|
||||||
}
|
}
|
||||||
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co)
|
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co, func() {})
|
||||||
defer cancel()
|
defer cancel()
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
@ -2031,7 +2031,7 @@ func (s *httpServer) cleanUp() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) {
|
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func(), cancel func()) {
|
||||||
var (
|
var (
|
||||||
err error
|
err error
|
||||||
lis net.Listener
|
lis net.Listener
|
||||||
|
@ -2063,8 +2063,9 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream
|
||||||
wh: wh,
|
wh: wh,
|
||||||
}
|
}
|
||||||
server.start(t, lis)
|
server.start(t, lis)
|
||||||
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
var connectCtx context.Context
|
||||||
client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {}, func(GoAwayReason) {}, func() {})
|
connectCtx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
|
||||||
|
client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel() // Do not cancel in success path.
|
cancel() // Do not cancel in success path.
|
||||||
t.Fatalf("Error creating client. Err: %v", err)
|
t.Fatalf("Error creating client. Err: %v", err)
|
||||||
|
@ -2083,7 +2084,8 @@ func TestHTTPToGRPCStatusMapping(t *testing.T) {
|
||||||
}
|
}
|
||||||
|
|
||||||
func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
|
func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
|
||||||
stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh)
|
stream, cleanUp, cancel := setUpHTTPStatusTest(t, httpStatus, wh)
|
||||||
|
defer cancel()
|
||||||
defer cleanUp()
|
defer cleanUp()
|
||||||
want := httpStatusConvTab[httpStatus]
|
want := httpStatusConvTab[httpStatus]
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
|
@ -2101,7 +2103,8 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders)
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
|
func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
|
||||||
stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
|
stream, cleanUp, cancel := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
|
||||||
|
defer cancel()
|
||||||
defer cleanUp()
|
defer cleanUp()
|
||||||
buf := make([]byte, 8)
|
buf := make([]byte, 8)
|
||||||
_, err := stream.Read(buf)
|
_, err := stream.Read(buf)
|
||||||
|
|
|
@ -126,9 +126,7 @@ func FromError(err error) (s *Status, ok bool) {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
|
return &Status{s: &spb.Status{Code: int32(codes.OK)}}, true
|
||||||
}
|
}
|
||||||
if se, ok := err.(interface {
|
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
|
||||||
GRPCStatus() *Status
|
|
||||||
}); ok {
|
|
||||||
return se.GRPCStatus(), true
|
return se.GRPCStatus(), true
|
||||||
}
|
}
|
||||||
return New(codes.Unknown, err.Error()), false
|
return New(codes.Unknown, err.Error()), false
|
||||||
|
@ -184,9 +182,7 @@ func Code(err error) codes.Code {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
return codes.OK
|
return codes.OK
|
||||||
}
|
}
|
||||||
if se, ok := err.(interface {
|
if se, ok := err.(interface{ GRPCStatus() *Status }); ok {
|
||||||
GRPCStatus() *Status
|
|
||||||
}); ok {
|
|
||||||
return se.GRPCStatus().Code()
|
return se.GRPCStatus().Code()
|
||||||
}
|
}
|
||||||
return codes.Unknown
|
return codes.Unknown
|
||||||
|
|
Loading…
Reference in New Issue