transport: remove RequireHandshakeHybrid support (#2529)

This removes RequireHandshakeHybrid support and changes the default behavior
to RequireHandshakeOn. Dial calls will now block and wait for a successful
handshake before proceeding. Users relying on the old hybrid behavior (cmux
users) should consult https://github.com/soheilhy/cmux/issues/64.

Also, several tests have been updated to take this into consideration by
sending settings frames.
This commit is contained in:
Jean de Klerk 2019-02-27 11:04:46 -07:00 committed by GitHub
parent a51d23e017
commit 5878d965b2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 172 additions and 239 deletions

View File

@ -1019,35 +1019,8 @@ func (ac *addrConn) resetTransport() {
reconnect := grpcsync.NewEvent()
prefaceReceived := make(chan struct{})
newTr, err := ac.createTransport(addr, copts, connectDeadline, reconnect, prefaceReceived)
if err == nil {
ac.mu.Lock()
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
} else {
if err != nil {
ac.cc.blockingpicker.updateConnectionError(err)
hcancel()
if err == errConnClosing {
return
@ -1060,55 +1033,46 @@ func (ac *addrConn) resetTransport() {
}
ac.mu.Lock()
reqHandshake := ac.dopts.reqHandshake
ac.curAddr = addr
ac.transport = newTr
ac.mu.Unlock()
healthCheckConfig := ac.cc.healthCheckConfig()
// LB channel health checking is only enabled when all the four requirements below are met:
// 1. it is not disabled by the user with the WithDisableHealthCheck DialOption,
// 2. the internal.HealthCheckFunc is set by importing the grpc/healthcheck package,
// 3. a service config with non-empty healthCheckConfig field is provided,
// 4. the current load balancer allows it.
healthcheckManagingState := false
if !ac.cc.dopts.disableHealthCheck && healthCheckConfig != nil && ac.scopts.HealthCheckEnabled {
if ac.cc.dopts.healthCheckFunc == nil {
// TODO: add a link to the health check doc in the error message.
grpclog.Error("the client side LB channel health check function has not been set.")
} else {
// TODO(deklerk) refactor to just return transport
go ac.startHealthCheck(hctx, newTr, addr, healthCheckConfig.ServiceName)
healthcheckManagingState = true
}
}
if !healthcheckManagingState {
ac.mu.Lock()
ac.updateConnectivityState(connectivity.Ready)
ac.mu.Unlock()
}
<-reconnect.Done()
hcancel()
if reqHandshake == envconfig.RequireHandshakeHybrid {
// In RequireHandshakeHybrid mode, we must check to see whether
// server preface has arrived yet to decide whether to start
// reconnecting at the top of the list (server preface received)
// or continue with the next addr in the list as if the
// connection were not successful (server preface not received).
select {
case <-prefaceReceived:
// We received a server preface - huzzah! We consider this
// a success and restart from the top of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
default:
// Despite having set state to READY, in hybrid mode we
// consider this a failure and continue connecting at the
// next addr in the list.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
ac.mu.Unlock()
return
}
ac.updateConnectivityState(connectivity.TransientFailure)
ac.mu.Unlock()
if tryNextAddrFromStart.HasFired() {
break addrLoop
}
}
} else {
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}
// In RequireHandshakeOn mode, we would have already waited for
// the server preface, so we consider this a success and restart
// from the top of the addr list. In RequireHandshakeOff mode,
// we don't care to wait for the server preface before
// considering this a success, so we also restart from the top
// of the addr list.
ac.mu.Lock()
ac.backoffIdx = 0
ac.mu.Unlock()
break addrLoop
}
// After exhausting all addresses, or after need to reconnect after a
@ -1154,8 +1118,6 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
Authority: ac.cc.authority,
}
prefaceTimer := time.NewTimer(time.Until(connectDeadline))
onGoAway := func(r transport.GoAwayReason) {
ac.mu.Lock()
ac.adjustParams(r)
@ -1165,13 +1127,11 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
onClose := func() {
close(onCloseCalled)
prefaceTimer.Stop()
reconnect.Fire()
}
onPrefaceReceipt := func() {
close(prefaceReceived)
prefaceTimer.Stop()
}
connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline)
@ -1181,38 +1141,8 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
}
newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt, onGoAway, onClose)
if err == nil {
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
err = errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
}
} else if ac.dopts.reqHandshake == envconfig.RequireHandshakeHybrid {
go func() {
select {
case <-prefaceTimer.C:
// We didn't get the preface in time.
newTr.Close()
case <-prefaceReceived:
// We got the preface just in the nick of time - huzzah!
case <-onCloseCalled:
// The transport has already closed - noop.
}
}()
}
}
if err != nil {
// newTr is either nil, or closed.
ac.cc.blockingpicker.updateConnectionError(err)
ac.mu.Lock()
if ac.state == connectivity.Shutdown {
// ac.tearDown(...) has been invoked.
@ -1225,6 +1155,22 @@ func (ac *addrConn) createTransport(addr resolver.Address, copts transport.Conne
return nil, err
}
if ac.dopts.reqHandshake == envconfig.RequireHandshakeOn {
select {
case <-time.After(connectDeadline.Sub(time.Now())):
// We didn't get the preface in time.
newTr.Close()
grpclog.Warningf("grpc: addrConn.createTransport failed to connect to %v: didn't receive server preface in time. Reconnecting...", addr)
return nil, errors.New("timed out waiting for server handshake")
case <-prefaceReceived:
// We got the preface - huzzah! things are good.
case <-onCloseCalled:
// The transport has already closed - noop.
return nil, errors.New("connection closed")
// TODO(deklerk) this should bail on ac.ctx.Done(). Add a test and fix.
}
}
// Now there is a viable transport to be use, so set ac.transport to reflect the new viable transport.
ac.mu.Lock()
if ac.state == connectivity.Shutdown {

View File

@ -121,15 +121,6 @@ func (s) TestDialWithMultipleBackendsNotSendingServerPreface(t *testing.T) {
var allReqHSSettings = []envconfig.RequireHandshakeSetting{
envconfig.RequireHandshakeOff,
envconfig.RequireHandshakeOn,
envconfig.RequireHandshakeHybrid,
}
var reqNoHSSettings = []envconfig.RequireHandshakeSetting{
envconfig.RequireHandshakeOff,
envconfig.RequireHandshakeHybrid,
}
var reqHSBeforeSuccess = []envconfig.RequireHandshakeSetting{
envconfig.RequireHandshakeOn,
envconfig.RequireHandshakeHybrid,
}
func (s) TestDialWaitsForServerSettings(t *testing.T) {
@ -332,50 +323,46 @@ func (s) TestDialDoesNotWaitForServerSettings(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()
envconfig.RequireHandshake = envconfig.RequireHandshakeOff
// Test with "off" and "hybrid".
for _, setting := range reqNoHSSettings {
envconfig.RequireHandshake = setting
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
done := make(chan struct{})
dialDone := make(chan struct{})
go func() { // Launch the server.
defer func() {
close(done)
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn.Close()
<-dialDone // Close conn only after dial returns.
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
if err != nil {
t.Fatalf("DialContext returned err =%v; want nil", err)
}
defer client.Close()
if state := client.GetState(); state != connectivity.Ready {
t.Fatalf("client.GetState() = %v; want connectivity.Ready", state)
}
close(dialDone)
<-done
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
defer lis.Close()
done := make(chan struct{})
dialDone := make(chan struct{})
go func() { // Launch the server.
defer func() {
close(done)
}()
conn, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn.Close()
<-dialDone // Close conn only after dial returns.
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
client, err := DialContext(ctx, lis.Addr().String(), WithInsecure(), WithBlock())
if err != nil {
t.Fatalf("DialContext returned err =%v; want nil", err)
}
defer client.Close()
if state := client.GetState(); state != connectivity.Ready {
t.Fatalf("client.GetState() = %v; want connectivity.Ready", state)
}
close(dialDone)
}
func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
// Restore current setting after test.
old := envconfig.RequireHandshake
defer func() { envconfig.RequireHandshake = old }()
envconfig.RequireHandshake = envconfig.RequireHandshakeOn
// 1. Client connects to a server that doesn't send preface.
// 2. After minConnectTimeout(500 ms here), client disconnects and retries.
@ -384,80 +371,75 @@ func (s) TestCloseConnectionWhenServerPrefaceNotReceived(t *testing.T) {
cleanup := setMinConnectTimeout(time.Millisecond * 500)
defer cleanup()
// Test with "on" and "hybrid".
for _, setting := range reqHSBeforeSuccess {
envconfig.RequireHandshake = setting
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
var (
conn2 net.Conn
over uint32
)
defer func() {
lis.Close()
// conn2 shouldn't be closed until the client has
// observed a successful test.
if conn2 != nil {
conn2.Close()
}
}()
done := make(chan struct{})
accepted := make(chan struct{})
go func() { // Launch the server.
defer close(done)
conn1, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn1.Close()
// Don't send server settings and the client should close the connection and try again.
conn2, err = lis.Accept() // Accept a reconnection request from client.
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
close(accepted)
framer := http2.NewFramer(conn2, conn2)
if err = framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings. Err: %v", err)
return
}
b := make([]byte, 8)
for {
_, err = conn2.Read(b)
if err == nil {
continue
}
if atomic.LoadUint32(&over) == 1 {
// The connection stayed alive for the timer.
// Success.
return
}
t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
break
}
}()
client, err := Dial(lis.Addr().String(), WithInsecure())
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}
// wait for connection to be accepted on the server.
timer := time.NewTimer(time.Second * 10)
select {
case <-accepted:
case <-timer.C:
t.Fatalf("Client didn't make another connection request in time.")
}
// Make sure the connection stays alive for sometime.
time.Sleep(time.Second)
atomic.StoreUint32(&over, 1)
client.Close()
<-done
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Error while listening. Err: %v", err)
}
var (
conn2 net.Conn
over uint32
)
defer func() {
lis.Close()
// conn2 shouldn't be closed until the client has
// observed a successful test.
if conn2 != nil {
conn2.Close()
}
}()
done := make(chan struct{})
accepted := make(chan struct{})
go func() { // Launch the server.
defer close(done)
conn1, err := lis.Accept()
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
defer conn1.Close()
// Don't send server settings and the client should close the connection and try again.
conn2, err = lis.Accept() // Accept a reconnection request from client.
if err != nil {
t.Errorf("Error while accepting. Err: %v", err)
return
}
close(accepted)
framer := http2.NewFramer(conn2, conn2)
if err = framer.WriteSettings(http2.Setting{}); err != nil {
t.Errorf("Error while writing settings. Err: %v", err)
return
}
b := make([]byte, 8)
for {
_, err = conn2.Read(b)
if err == nil {
continue
}
if atomic.LoadUint32(&over) == 1 {
// The connection stayed alive for the timer.
// Success.
return
}
t.Errorf("Unexpected error while reading. Err: %v, want timeout error", err)
break
}
}()
client, err := Dial(lis.Addr().String(), WithInsecure())
if err != nil {
t.Fatalf("Error while dialing. Err: %v", err)
}
// wait for connection to be accepted on the server.
timer := time.NewTimer(time.Second * 10)
select {
case <-accepted:
case <-timer.C:
t.Fatalf("Client didn't make another connection request in time.")
}
// Make sure the connection stays alive for sometime.
time.Sleep(time.Second)
atomic.StoreUint32(&over, 1)
client.Close()
<-done
}
func (s) TestBackoffWhenNoServerPrefaceReceived(t *testing.T) {

View File

@ -34,13 +34,9 @@ const (
type RequireHandshakeSetting int
const (
// RequireHandshakeHybrid (default, deprecated) indicates to not wait for
// handshake before considering a connection ready, but wait before
// considering successful.
RequireHandshakeHybrid RequireHandshakeSetting = iota
// RequireHandshakeOn (default after the 1.17 release) indicates to wait
// for handshake before considering a connection ready/successful.
RequireHandshakeOn
// RequireHandshakeOn indicates to wait for handshake before considering a
// connection ready/successful.
RequireHandshakeOn RequireHandshakeSetting = iota
// RequireHandshakeOff indicates to not wait for handshake before
// considering a connection ready/successful.
RequireHandshakeOff
@ -53,7 +49,7 @@ var (
// environment variable.
//
// Will be removed after the 1.18 release.
RequireHandshake RequireHandshakeSetting
RequireHandshake = RequireHandshakeOn
)
func init() {
@ -64,8 +60,5 @@ func init() {
RequireHandshake = RequireHandshakeOn
case "off":
RequireHandshake = RequireHandshakeOff
case "hybrid":
// Will be removed after the 1.17 release.
RequireHandshake = RequireHandshakeHybrid
}
}

View File

@ -27,8 +27,8 @@ import (
"testing"
"time"
"golang.org/x/net/http2"
"google.golang.org/grpc"
"google.golang.org/grpc/internal/envconfig"
testpb "google.golang.org/grpc/test/grpc_testing"
)
@ -52,6 +52,13 @@ func (d *delayListener) Accept() (net.Conn, error) {
default:
close(d.acceptCalled)
conn, err := d.Listener.Accept()
if err != nil {
return nil, err
}
framer := http2.NewFramer(conn, conn)
if err = framer.WriteSettings(http2.Setting{}); err != nil {
return nil, err
}
// Allow closing of listener only after accept.
// Note: Dial can return successfully, yet Accept
// might now have finished.
@ -107,10 +114,14 @@ func (d *delayConn) Read(b []byte) (n int, err error) {
}
func (s) TestGracefulStop(t *testing.T) {
// Set default behavior and restore current setting after test.
// We need to turn off RequireHandshake because if it were on, it would
// block forever waiting to read the handshake, and the delayConn would
// never let it (the delay is intended to block until later in the test).
//
// Restore current setting after test.
old := envconfig.RequireHandshake
envconfig.RequireHandshake = envconfig.RequireHandshakeOff
defer func() { envconfig.RequireHandshake = old }()
envconfig.RequireHandshake = envconfig.RequireHandshakeOff
// This test ensures GracefulStop cannot race and break RPCs on new
// connections created after GracefulStop was called but before

1
vet.sh
View File

@ -119,5 +119,6 @@ google.golang.org/grpc/stats/stats_test.go:SA1019
google.golang.org/grpc/test/channelz_test.go:SA1019
google.golang.org/grpc/test/end2end_test.go:SA1019
google.golang.org/grpc/test/healthcheck_test.go:SA1019
google.golang.org/grpc/clientconn.go:S1024
' ./...
misspell -error .