Client should update keepalive parameters upon receiving GoAway with … (#1169)

* Client should update keepalive parameters upon receiving GoAway with EnhanceYourCalm and debug data of too_many_pings.
This commit is contained in:
MakMukhi 2017-04-10 14:33:51 -07:00 committed by GitHub
parent b47cbd158b
commit bfa5dd27dc
4 changed files with 91 additions and 0 deletions

View File

@ -309,6 +309,7 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
for _, opt := range opts {
opt(&cc.dopts)
}
cc.mkp = cc.dopts.copts.KeepaliveParams
grpcUA := "grpc-go/" + Version
if cc.dopts.copts.UserAgent != "" {
@ -458,6 +459,8 @@ type ClientConn struct {
mu sync.RWMutex
sc ServiceConfig
conns map[Address]*addrConn
// Keepalive parameter can be udated if a GoAway is received.
mkp keepalive.ClientParameters
}
// lbWatcher watches the Notify channel of the balancer in cc and manages
@ -533,6 +536,9 @@ func (cc *ClientConn) resetAddrConn(addr Address, block bool, tearDownErr error)
addr: addr,
dopts: cc.dopts,
}
cc.mu.RLock()
ac.dopts.copts.KeepaliveParams = cc.mkp
cc.mu.RUnlock()
ac.ctx, ac.cancel = context.WithCancel(cc.ctx)
ac.stateCV = sync.NewCond(&ac.mu)
if EnableTracing {
@ -714,6 +720,20 @@ type addrConn struct {
tearDownErr error
}
// adjustParams updates parameters used to create transports upon
// receiving a GoAway.
func (ac *addrConn) adjustParams(r transport.GoAwayReason) {
switch r {
case transport.TooManyPings:
v := 2 * ac.dopts.copts.KeepaliveParams.Time
ac.cc.mu.Lock()
if v > ac.cc.mkp.Time {
ac.cc.mkp.Time = v
}
ac.cc.mu.Unlock()
}
}
// printf records an event in ac's event log, unless ac has been closed.
// REQUIRES ac.mu is held.
func (ac *addrConn) printf(format string, a ...interface{}) {
@ -870,6 +890,7 @@ func (ac *addrConn) transportMonitor() {
}
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
// If GoAway happens without any network I/O error, ac is closed without shutting down the
// underlying transport (the transport will be closed when all the pending RPCs finished or
// failed.).
@ -889,6 +910,7 @@ func (ac *addrConn) transportMonitor() {
t.Close()
return
case <-t.GoAway():
ac.adjustParams(t.GetGoAwayReason())
ac.cc.resetAddrConn(ac.addr, false, errNetworkIO)
return
default:

View File

@ -41,6 +41,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
)
const tlsDir = "testdata/"
@ -306,3 +307,31 @@ func TestNonblockingDialWithEmptyBalancer(t *testing.T) {
<-dialDone
cancel()
}
func TestClientUpdatesParamsAfterGoAway(t *testing.T) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen. Err: %v", err)
}
defer lis.Close()
addr := lis.Addr().String()
s := NewServer()
go s.Serve(lis)
defer s.Stop()
cc, err := Dial(addr, WithBlock(), WithInsecure(), WithKeepaliveParams(keepalive.ClientParameters{
Time: 50 * time.Millisecond,
Timeout: 1 * time.Millisecond,
PermitWithoutStream: true,
}))
if err != nil {
t.Fatalf("Dial(%s, _) = _, %v, want _, <nil>", addr, err)
}
defer cc.Close()
time.Sleep(1 * time.Second)
cc.mu.RLock()
defer cc.mu.RUnlock()
v := cc.mkp.Time
if v < 100*time.Millisecond {
t.Fatalf("cc.dopts.copts.Keepalive.Time = %v , want 100ms", v)
}
}

View File

@ -121,6 +121,9 @@ type http2Client struct {
goAwayID uint32
// prevGoAway ID records the Last-Stream-ID in the previous GOAway frame.
prevGoAwayID uint32
// goAwayReason records the http2.ErrCode and debug data received with the
// GoAway frame.
goAwayReason GoAwayReason
}
func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) {
@ -909,6 +912,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
return
default:
t.setGoAwayReason(f)
}
t.goAwayID = f.LastStreamID
close(t.goAway)
@ -916,6 +920,26 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
t.mu.Unlock()
}
// setGoAwayReason sets the value of t.goAwayReason based
// on the GoAway frame received.
// It expects a lock on transport's mutext to be held by
// the caller.
func (t *http2Client) setGoAwayReason(f *http2.GoAwayFrame) {
t.goAwayReason = NoReason
switch f.ErrCode {
case http2.ErrCodeEnhanceYourCalm:
if string(f.DebugData()) == "too_many_pings" {
t.goAwayReason = TooManyPings
}
}
}
func (t *http2Client) GetGoAwayReason() GoAwayReason {
t.mu.Lock()
defer t.mu.Unlock()
return t.goAwayReason
}
func (t *http2Client) handleWindowUpdate(f *http2.WindowUpdateFrame) {
id := f.Header().StreamID
incr := f.Increment

View File

@ -493,6 +493,9 @@ type ClientTransport interface {
// receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2).
GoAway() <-chan struct{}
// GetGoAwayReason returns the reason why GoAway frame was received.
GetGoAwayReason() GoAwayReason
}
// ServerTransport is the common interface for all gRPC server-side transport
@ -630,3 +633,16 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <-
return i, nil
}
}
// GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8
const (
// Invalid indicates that no GoAway frame is received.
Invalid GoAwayReason = 0
// NoReason is the default value when GoAway frame is received.
NoReason GoAwayReason = 1
// TooManyPings indicates that a GoAway frame with ErrCodeEnhanceYourCalm
// was recieved and that the debug data said "too_many_pings".
TooManyPings GoAwayReason = 2
)