internal: refactor transport to single retry mechanism (#2461)

Previously, the transport was able to reset via the retry loop,
or via the event closures calling resetTransport. This meant
a very large amount of synchronization was necessary: one
reset meant the other had to not reset; state had to be kept
at the addrconn; and very subtle interactions were hard to
reason about.

This change removes the ability for event closures to directly
reset the transport. Instead, they signal to to the retry
loop about the event, and the retry loop is always the single
place that retries occur.

This also allows us to refactor the address switching logic
into a much simpler for loop inside the retry loop instead of
using addrConn state to keep track of an index.
This commit is contained in:
Jean de Klerk 2018-12-17 13:10:13 -08:00 committed by GitHub
parent 42df0c551e
commit 1b41b79fd1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 419 additions and 307 deletions

View File

@ -591,13 +591,12 @@ func (cc *ClientConn) handleSubConnStateChange(sc balancer.SubConn, s connectivi
// Caller needs to make sure len(addrs) > 0.
func (cc *ClientConn) newAddrConn(addrs []resolver.Address, opts balancer.NewSubConnOptions) (*addrConn, error) {
ac := &addrConn{
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
successfulHandshake: true, // make the first nextAddr() call _not_ move addrIdx up by 1
resetBackoff: make(chan struct{}),
cc: cc,
addrs: addrs,
scopts: opts,
dopts: cc.dopts,
czData: new(channelzData),
resetBackoff: make(chan struct{}),
}
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
// Track ac in cc. This needs to be done before any getTransport(...) is called.
@ -679,11 +678,10 @@ func (ac *addrConn) connect() error {
return nil
}
ac.updateConnectivityState(connectivity.Connecting)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.mu.Unlock()
// Start a goroutine connecting to the server asynchronously.
go ac.resetTransport(false)
go ac.resetTransport()
return nil
}
@ -702,6 +700,12 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
return true
}
// Unless we're busy reconnecting already, let's reconnect from the top of
// the list.
if ac.state != connectivity.Ready {
return false
}
var curAddrFound bool
for _, a := range addrs {
if reflect.DeepEqual(ac.curAddr, a) {
@ -712,7 +716,6 @@ func (ac *addrConn) tryUpdateAddrs(addrs []resolver.Address) bool {
grpclog.Infof("addrConn: tryUpdateAddrs curAddrFound: %v", curAddrFound)
if curAddrFound {
ac.addrs = addrs
ac.addrIdx = 0 // Start reconnecting from beginning in the new list.
}
return curAddrFound
@ -912,7 +915,6 @@ type addrConn struct {
transport transport.ClientTransport // The current transport.
mu sync.Mutex
addrIdx int // The index in addrs list to start reconnecting from.
curAddr resolver.Address // The current address.
addrs []resolver.Address // All addresses that the resolver resolved to.
@ -921,33 +923,30 @@ type addrConn struct {
tearDownErr error // The reason this addrConn is torn down.
backoffIdx int
// backoffDeadline is the time until which resetTransport needs to
// wait before increasing backoffIdx count.
backoffDeadline time.Time
// connectDeadline is the time by which all connection
// negotiations must complete.
connectDeadline time.Time
backoffIdx int // Needs to be stateful for resetConnectBackoff.
resetBackoff chan struct{}
channelzID int64 // channelz unique identification number
czData *channelzData
successfulHandshake bool
channelzID int64 // channelz unique identification number.
czData *channelzData
healthCheckEnabled bool
}
// Note: this requires a lock on ac.mu.
func (ac *addrConn) updateConnectivityState(s connectivity.State) {
if ac.state == s {
return
}
updateMsg := fmt.Sprintf("Subchannel Connectivity change to %v", s)
grpclog.Infof(updateMsg)
ac.state = s
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel Connectivity change to %v", s),
Desc: updateMsg,
Severity: channelz.CtINFO,
})
}
ac.cc.handleSubConnStateChange(ac.acbw, s)
}
// adjustParams updates parameters used to create transports upon
@ -964,173 +963,219 @@ func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
}
}
// resetTransport makes sure that a healthy ac.transport exists.
//
// The transport will close itself when it encounters an error, or on GOAWAY, or on deadline waiting for handshake, or
// when the clientconn is closed. Each iteration creating a new transport will try a different address that the balancer
// assigned to the addrConn, until it has tried all addresses. Once it has tried all addresses, it will re-resolve to
// get a new address list. If an error is received, the list is re-resolved and the next reset attempt will try from the
// beginning. This method has backoff built in. The backoff amount starts at 0 and increases each time resolution occurs
// (addresses are exhausted). The backoff amount is reset to 0 each time a handshake is received.
//
// If the DialOption WithWaitForHandshake was set, resetTransport returns successfully only after handshake is received.
func (ac *addrConn) resetTransport(resolveNow bool) {
for {
// If this is the first in a line of resets, we want to resolve immediately. The only other time we
// want to reset is if we have tried all the addresses handed to us.
if resolveNow {
ac.mu.Lock()
func (ac *addrConn) resetTransport() {
for i := 0; ; i++ {
tryNextAddrFromStart := grpcsync.NewEvent()
ac.mu.Lock()
if i > 0 {
ac.cc.resolveNow(resolver.ResolveNowOption{})
ac.mu.Unlock()
}
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
// The transport that was used before is no longer viable.
ac.transport = nil
// If the connection is READY, a failure must have occurred.
// Otherwise, we'll consider this is a transient failure when:
// We've exhausted all addresses
// We're in CONNECTING
// And it's not the very first addr to try TODO(deklerk) find a better way to do this than checking ac.successfulHandshake
if ac.state == connectivity.Ready || (ac.addrIdx == len(ac.addrs)-1 && ac.state == connectivity.Connecting && !ac.successfulHandshake) {
ac.updateConnectivityState(connectivity.TransientFailure)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.transport = nil
addrs := ac.addrs
backoffFor := ac.dopts.bs.Backoff(ac.backoffIdx)
ac.mu.Unlock()
if err := ac.nextAddr(); err != nil {
return
}
addrLoop:
for _, addr := range addrs {
ac.mu.Lock()
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
backoffIdx := ac.backoffIdx
backoffFor := ac.dopts.bs.Backoff(backoffIdx)
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if backoffFor > dialDuration {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
start := time.Now()
connectDeadline := start.Add(dialDuration)
ac.backoffDeadline = start.Add(backoffFor)
ac.connectDeadline = connectDeadline
ac.mu.Unlock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
if ac.state != connectivity.Connecting {
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.Connecting)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.transport = nil
ac.mu.Unlock()
// This will be the duration that dial gets to finish.
dialDuration := getMinConnectTimeout()
if dialDuration < backoffFor {
// Give dial more time as we keep failing to connect.
dialDuration = backoffFor
}
connectDeadline := time.Now().Add(dialDuration)
ac.mu.Lock()
ac.cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = ac.cc.mkp
ac.cc.mu.RUnlock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
}
hctx, hcancel := context.WithCancel(ac.ctx)
defer hcancel()
ac.mu.Unlock()
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
Severity: channelz.CtINFO,
})
}
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
hcancel()
if err == errConnClosing {
return
}
if tryNextAddrFromStart.HasFired() {
break addrLoop
}
continue
}
ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.mu.Unlock()
<-reconnect.Done()
hcancel()
if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()
if tryNextAddrFromStart.HasFired() {
break addrLoop
}
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}
}
addr := ac.addrs[ac.addrIdx]
copts := ac.dopts.copts
if ac.scopts.CredsBundle != nil {
copts.CredsBundle = ac.scopts.CredsBundle
// After exhausting all addresses, or after need to reconnect after a
// READY, the addrConn enters TRANSIENT_FAILURE.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
// Backoff.
b := ac.resetBackoff
timer := time.NewTimer(backoffFor)
acctx := ac.ctx
ac.mu.Unlock()
if channelz.IsOn() {
channelz.AddTraceEvent(ac.channelzID, &channelz.TraceEventDesc{
Desc: fmt.Sprintf("Subchannel picks a new address %q to connect", addr.Addr),
Severity: channelz.CtINFO,
})
select {
case <-timer.C:
ac.mu.Lock()
ac.backoffIdx++
ac.mu.Unlock()
case <-b:
timer.Stop()
case <-acctx.Done():
timer.Stop()
return
}
if err := ac.createTransport(backoffIdx, addr, copts, connectDeadline); err != nil {
continue
}
return
}
}
// createTransport creates a connection to one of the backends in addrs.
func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time) error {
oneReset := sync.Once{}
skipReset := make(chan struct{})
allowedToReset := make(chan struct{})
prefaceReceived := make(chan struct{})
// createTransport creates a connection to one of the backends in addrs. It
// sets ac.transport in the success case, or it returns an error if it was
// unable to successfully create a transport.
//
// If waitForHandshake is enabled, it blocks until server preface arrives.
func (ac *addrConn) createTransport(addr resolver.Address, copts transport.ConnectOptions, connectDeadline time.Time, reconnect *grpcsync.Event, prefaceReceived chan struct{}) (transport.ClientTransport, error) {
onCloseCalled := make(chan struct{})
var prefaceMu sync.Mutex
var serverPrefaceReceived bool
var clientPrefaceWrote bool
hcCtx, hcCancel := context.WithCancel(ac.ctx)
onGoAway := func(r transport.GoAwayReason) {
hcCancel()
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
select {
case <-skipReset: // The outer resetTransport loop will handle reconnection.
return
case <-allowedToReset: // We're in the clear to reset.
go oneReset.Do(func() { ac.resetTransport(false) })
}
}
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
onClose := func() {
hcCancel()
close(onCloseCalled)
prefaceTimer.Stop()
select {
case <-skipReset: // The outer resetTransport loop will handle reconnection.
return
case <-allowedToReset: // We're in the clear to reset.
oneReset.Do(func() { ac.resetTransport(false) })
}
}
target := transport.TargetInfo{
Addr: addr.Addr,
Metadata: addr.Metadata,
Authority: ac.cc.authority,
}
prefaceTimer := time.NewTimer(connectDeadline.Sub(time.Now()))
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
ac.mu.Unlock()
reconnect.Fire()
}
onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
// TODO(deklerk): optimization; does anyone else actually use this lock? maybe we can just remove it for this scope
ac.mu.Lock()
prefaceMu.Lock()
serverPrefaceReceived = true
if clientPrefaceWrote {
ac.successfulHandshake = true
}
prefaceMu.Unlock()
ac.mu.Unlock()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@ -1142,13 +1187,6 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
prefaceMu.Lock()
clientPrefaceWrote = true
if serverPrefaceReceived || ac.dopts.reqHandshake == envconfig.RequireHandshakeOff {
ac.successfulHandshake = true
}
prefaceMu.Unlock()
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
@ -1159,8 +1197,7 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
close(allowedToReset)
return nil
return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
@ -1185,70 +1222,32 @@ func (ac *addrConn) createTransport(backoffNum int, addr resolver.Address, copts
// ac.tearDown(...) has been invoked.
ac.mu.Unlock()
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return errConnClosing
return nil, errConnClosing
}
ac.mu.Unlock()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v. Err :%v. Reconnecting...", addr, err)
// We don't want to reset during this close because we prefer to kick out of this function and let the loop
// in resetTransport take care of reconnecting.
close(skipReset)
return err
return nil, err
}
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
close(skipReset)
newTr.Close()
return nil
return nil, errConnClosing
}
ac.transport = newTr
ac.mu.Unlock()
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc != nil {
go ac.startHealthCheck(hcCtx, newTr, addr, healthCheckConfig.ServiceName)
close(allowedToReset)
return nil
}
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
}
// No LB channel health check case
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
// unblock onGoAway/onClose callback.
close(skipReset)
return errConnClosing
newTr.Close()
return nil, errConnClosing
}
ac.updateConnectivityState(connectivity.Ready)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = addr
ac.mu.Unlock()
// Ok, _now_ we will finally let the transport reset if it encounters a closable error. Without this, the reader
// goroutine failing races with all the code in this method that sets the connection to "ready".
close(allowedToReset)
return nil
return newTr, nil
}
func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.ClientTransport, addr resolver.Address, serviceName string) {
@ -1268,18 +1267,11 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client
firstReady = false
ac.curAddr = addr
}
if ac.state != connectivity.Ready {
ac.updateConnectivityState(connectivity.Ready)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.updateConnectivityState(connectivity.Ready)
} else {
if ac.state != connectivity.TransientFailure {
ac.updateConnectivityState(connectivity.TransientFailure)
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
}
ac.updateConnectivityState(connectivity.TransientFailure)
}
}
err := ac.cc.dopts.healthCheckFunc(ctx, newStream, reportHealth, serviceName)
if err != nil {
if status.Code(err) == codes.Unimplemented {
@ -1296,55 +1288,6 @@ func (ac *addrConn) startHealthCheck(ctx context.Context, newTr transport.Client
}
}
// nextAddr increments the addrIdx if there are more addresses to try. If
// there are no more addrs to try it will re-resolve, set addrIdx to 0, and
// increment the backoffIdx.
//
// nextAddr must be called without ac.mu being held.
func (ac *addrConn) nextAddr() error {
ac.mu.Lock()
// If a handshake has been observed, we want the next usage to start at
// index 0 immediately.
if ac.successfulHandshake {
ac.successfulHandshake = false
ac.backoffDeadline = time.Time{}
ac.connectDeadline = time.Time{}
ac.addrIdx = 0
ac.backoffIdx = 0
ac.mu.Unlock()
return nil
}
if ac.addrIdx < len(ac.addrs)-1 {
ac.addrIdx++
ac.mu.Unlock()
return nil
}
ac.addrIdx = 0
ac.backoffIdx++
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return errConnClosing
}
ac.cc.resolveNow(resolver.ResolveNowOption{})
backoffDeadline := ac.backoffDeadline
b := ac.resetBackoff
ac.mu.Unlock()
timer := time.NewTimer(backoffDeadline.Sub(time.Now()))
select {
case <-timer.C:
case <-b:
timer.Stop()
case <-ac.ctx.Done():
timer.Stop()
return ac.ctx.Err()
}
return nil
}
func (ac *addrConn) resetConnectBackoff() {
ac.mu.Lock()
close(ac.resetBackoff)
@ -1393,7 +1336,6 @@ func (ac *addrConn) tearDown(err error) {
ac.updateConnectivityState(connectivity.Shutdown)
ac.cancel()
ac.tearDownErr = err
ac.cc.handleSubConnStateChange(ac.acbw, ac.state)
ac.curAddr = resolver.Address{}
if err == errConnDrain && curTr != nil {
// GracefulClose(...) may be executed multiple times when

View File

@ -43,8 +43,9 @@ func init() {
balancer.Register(testBalancer)
}
// These tests use a pipeListener. This listener is similar to net.Listener except that it is unbuffered, so each read
// and write will wait for the other side's corresponding write or read.
// These tests use a pipeListener. This listener is similar to net.Listener
// except that it is unbuffered, so each read and write will wait for the other
// side's corresponding write or read.
func TestStateTransitions_SingleAddress(t *testing.T) {
defer leakcheck.Check(t)
@ -201,7 +202,7 @@ func testStateTransitionSingleAddress(t *testing.T, want []connectivity.State, s
}
// When a READY connection is closed, the client enters TRANSIENT FAILURE before CONNECTING.
func TestStateTransition_ReadyToTransientFailure(t *testing.T) {
func TestStateTransitions_ReadyToTransientFailure(t *testing.T) {
defer leakcheck.Check(t)
want := []connectivity.State{
@ -270,8 +271,8 @@ func TestStateTransition_ReadyToTransientFailure(t *testing.T) {
}
}
// When the first connection is closed, the client enters stays in CONNECTING until it tries the second
// address (which succeeds, and then it enters READY).
// When the first connection is closed, the client enters stays in CONNECTING
// until it tries the second address (which succeeds, and then it enters READY).
func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
defer leakcheck.Check(t)
@ -366,8 +367,9 @@ func TestStateTransitions_TriesAllAddrsBeforeTransientFailure(t *testing.T) {
}
}
// When there are multiple addresses, and we enter READY on one of them, a later closure should cause
// the client to enter TRANSIENT FAILURE before it re-enters CONNECTING.
// When there are multiple addresses, and we enter READY on one of them, a
// later closure should cause the client to enter TRANSIENT FAILURE before it
// re-enters CONNECTING.
func TestStateTransitions_MultipleAddrsEntersReady(t *testing.T) {
defer leakcheck.Check(t)
@ -505,9 +507,10 @@ type noBackoff struct{}
func (b noBackoff) Backoff(int) time.Duration { return time.Duration(0) }
// Keep reading until something causes the connection to die (EOF, server closed, etc). Useful
// as a tool for mindlessly keeping the connection healthy, since the client will error if
// things like client prefaces are not accepted in a timely fashion.
// Keep reading until something causes the connection to die (EOF, server
// closed, etc). Useful as a tool for mindlessly keeping the connection
// healthy, since the client will error if things like client prefaces are not
// accepted in a timely fashion.
func keepReading(conn net.Conn) {
buf := make([]byte, 1024)
for _, err := conn.Read(buf); err == nil; _, err = conn.Read(buf) {

View File

@ -285,7 +285,8 @@ func TestDialWaitsForServerSettingsAndFails(t *testing.T) {
defer conn.Close()
}
}()
getMinConnectTimeout = func() time.Duration { return time.Second / 4 }
cleanup := setMinConnectTimeout(time.Second / 4)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithWaitForHandshake(), WithBlock(), withBackoff(noBackoff{}))
@ -331,7 +332,8 @@ func TestDialWaitsForServerSettingsViaEnvAndFails(t *testing.T) {
defer conn.Close()
}
}()
getMinConnectTimeout = func() time.Duration { return time.Second / 4 }
cleanup := setMinConnectTimeout(time.Second / 4)
defer cleanup()
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock(), withBackoff(noBackoff{}))
@ -404,12 +406,9 @@ func TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
// 2. After minConnectTimeout(500 ms here), client disconnects and retries.
// 3. The new server sends its preface.
// 4. Client doesn't kill the connection this time.
mctBkp := getMinConnectTimeout()
defer func() {
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
}()
defer leakcheck.Check(t)
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(time.Millisecond)*500)
cleanup := setMinConnectTimeout(time.Millisecond * 500)
defer cleanup()
// Test with "on" and "hybrid".
for _, setting := range reqHSBeforeSuccess {
@ -913,7 +912,7 @@ func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
time.Sleep(time.Second)
time.Sleep(1 * time.Second)
cc.mu.RLock()
defer cc.mu.RUnlock()
v := cc.mkp.Time
@ -944,7 +943,7 @@ func TestDisableServiceConfigOption(t *testing.T) {
}
]
}`)
time.Sleep(time.Second)
time.Sleep(1 * time.Second)
m := cc.GetMethodConfig("/foo/Bar")
if m.WaitForReady != nil {
t.Fatalf("want: method (\"/foo/bar/\") config to be empty, got: %v", m)
@ -1020,3 +1019,171 @@ func TestBackoffCancel(t *testing.T) {
cc.Close()
// Should not leak. May need -count 5000 to exercise.
}
// UpdateAddresses should cause the next reconnect to begin from the top of the
// list if the connection is not READY.
func TestUpdateAddresses_RetryFromFirstAddr(t *testing.T) {
cleanup := setMinConnectTimeout(time.Hour)
defer cleanup()
lis1, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis1.Close()
lis2, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis2.Close()
lis3, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis3.Close()
closeServer2 := make(chan struct{})
server1ContactedFirstTime := make(chan struct{})
server1ContactedSecondTime := make(chan struct{})
server2ContactedFirstTime := make(chan struct{})
server2ContactedSecondTime := make(chan struct{})
server3Contacted := make(chan struct{})
stateNotifications := make(chan connectivity.State, 100)
testBalancer.ResetNotifier(stateNotifications)
// Launch server 1.
go func() {
// First, let's allow the initial connection to go READY. We need to do
// this because tryUpdateAddrs only works after there's some non-nil
// address on the ac, and curAddress is only set after READY.
conn1, err := lis1.Accept()
if err != nil {
t.Error(err)
return
}
go keepReading(conn1)
framer := http2.NewFramer(conn1, conn1)
if err := framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings frame. %v", err)
return
}
// Wait for the transport to become ready.
for s := range stateNotifications {
if s == connectivity.Ready {
break
}
}
// Once it's ready, curAddress has been set. So let's close this
// connection prompting the first reconnect cycle.
conn1.Close()
// Accept and immediately close, causing it to go to server2.
conn2, err := lis1.Accept()
if err != nil {
t.Error(err)
return
}
close(server1ContactedFirstTime)
conn2.Close()
// Hopefully it picks this server after tryUpdateAddrs.
lis1.Accept()
close(server1ContactedSecondTime)
}()
// Launch server 2.
go func() {
// Accept and then hang waiting for the test call tryUpdateAddrs and
// then signal to this server to close. After this server closes, it
// should start from the top instead of trying server2 or continuing
// to server3.
conn, err := lis2.Accept()
if err != nil {
t.Error(err)
return
}
close(server2ContactedFirstTime)
<-closeServer2
conn.Close()
// After tryUpdateAddrs, it should NOT try server2.
lis2.Accept()
close(server2ContactedSecondTime)
}()
// Launch server 3.
go func() {
// After tryUpdateAddrs, it should NOT try server3. (or any other time)
lis3.Accept()
close(server3Contacted)
}()
addrsList := []resolver.Address{
{Addr: lis1.Addr().String()},
{Addr: lis2.Addr().String()},
{Addr: lis3.Addr().String()},
}
rb := manual.NewBuilderWithScheme("whatever")
rb.InitialAddrs(addrsList)
client, err := Dial("this-gets-overwritten", WithInsecure(), WithWaitForHandshake(), withResolverBuilder(rb), withBackoff(noBackoff{}), WithBalancerName(stateRecordingBalancerName))
if err != nil {
t.Fatal(err)
}
defer client.Close()
timeout := time.After(5 * time.Second)
// Wait for server1 to be contacted (which will immediately fail), then
// server2 (which will hang waiting for our signal).
select {
case <-server1ContactedFirstTime:
case <-timeout:
t.Fatal("timed out waiting for server1 to be contacted")
}
select {
case <-server2ContactedFirstTime:
case <-timeout:
t.Fatal("timed out waiting for server2 to be contacted")
}
// Grab the addrConn and call tryUpdateAddrs.
var ac *addrConn
client.mu.Lock()
for clientAC := range client.conns {
ac = clientAC
break
}
client.mu.Unlock()
ac.acbw.UpdateAddresses(addrsList)
// We've called tryUpdateAddrs - now let's make server2 close the
// connection and check that it goes back to server1 instead of continuing
// to server3 or trying server2 again.
close(closeServer2)
select {
case <-server1ContactedSecondTime:
case <-server2ContactedSecondTime:
t.Fatal("server2 was contacted a second time, but it after tryUpdateAddrs it should have re-started the list and tried server1")
case <-server3Contacted:
t.Fatal("server3 was contacted, but after tryUpdateAddrs it should have re-started the list and tried server1")
case <-timeout:
t.Fatal("timed out waiting for any server to be contacted after tryUpdateAddrs")
}
}
// Set the minConnectTimeout. Be sure to defer cleanup!
func setMinConnectTimeout(newMin time.Duration) (cleanup func()) {
mctBkp := getMinConnectTimeout()
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(newMin))
return func() {
atomic.StoreInt64((*int64)(&mutableMinConnectTimeout), int64(mctBkp))
}
}

View File

@ -34,7 +34,7 @@ const (
type RequireHandshakeSetting int
const (
// RequireHandshakeHybrid (default, deprecated) indicates to wait for
// RequireHandshakeHybrid (default, deprecated) indicates to not wait for
// handshake before considering a connection ready, but wait before
// considering successful.
RequireHandshakeHybrid RequireHandshakeSetting = iota

View File

@ -91,10 +91,10 @@ type http2Client struct {
maxSendHeaderListSize *uint32
bdpEst *bdpEstimator
// onSuccess is a callback that client transport calls upon
// onPrefaceReceipt is a callback that client transport calls upon
// receiving server preface to signal that a succefull HTTP2
// connection was established.
onSuccess func()
onPrefaceReceipt func()
maxConcurrentStreams uint32
streamQuota int64
@ -145,7 +145,7 @@ func isTemporary(err error) bool {
// newHTTP2Client constructs a connected ClientTransport to addr based on HTTP2
// and starts to receive messages on it. Non-nil error returns if construction
// fails.
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (_ *http2Client, err error) {
scheme := "http"
ctx, cancel := context.WithCancel(ctx)
defer func() {
@ -240,7 +240,7 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne
kp: kp,
statsHandler: opts.StatsHandler,
initialWindowSize: initialWindowSize,
onSuccess: onSuccess,
onPrefaceReceipt: onPrefaceReceipt,
nextID: 1,
maxConcurrentStreams: defaultMaxStreamsClient,
streamQuota: defaultMaxStreamsClient,
@ -780,7 +780,7 @@ func (t *http2Client) Close() error {
}
t.statsHandler.HandleConn(t.ctx, connEnd)
}
go t.onClose()
t.onClose()
return err
}
@ -1210,7 +1210,7 @@ func (t *http2Client) reader() {
t.Close() // this kicks off resetTransport, so must be last before return
return
}
t.onSuccess()
t.onPrefaceReceipt()
t.handleSettings(sf, true)
// loop to keep reading incoming messages on this transport.

View File

@ -511,8 +511,8 @@ type TargetInfo struct {
// NewClientTransport establishes the transport with the required ConnectOptions
// and returns it to the caller.
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onSuccess func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, target, opts, onSuccess, onGoAway, onClose)
func NewClientTransport(connectCtx, ctx context.Context, target TargetInfo, opts ConnectOptions, onPrefaceReceipt func(), onGoAway func(GoAwayReason), onClose func()) (ClientTransport, error) {
return newHTTP2Client(connectCtx, ctx, target, opts, onPrefaceReceipt, onGoAway, onClose)
}
// Options provides additional hints and information for message