diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 741b9b1a6..39aa1c780 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -9,22 +9,42 @@ import ( // Params is used to set keepalive parameters. type Params struct { // After a duration of this time the client pings the server to see if the transport is still alive. - Ktime time.Duration + Time time.Duration // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. - Ktimeout time.Duration + Timeout time.Duration //If true, client runs keepalive checks even with no active RPCs. - KNoStream bool + PermitNoStream bool } // DefaultKParams contains default values for keepalive parameters var DefaultKParams = Params{ - Ktime: time.Duration(math.MaxInt64), // default to infinite - Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds - KNoStream: false, + Time: time.Duration(math.MaxInt64), // default to infinite + Timeout: time.Duration(20 * time.Second), } -// Mu is a mutex to protect Enabled variable -var Mu = sync.Mutex{} +// mu is a mutex to protect Enabled variable +var mu = sync.Mutex{} -// Enabled is a knob used to turn keepalive on or off -var Enabled = false +// enable is a knob used to turn keepalive on or off +var enable = false + +// Enabled exposes the value of enable variable +func Enabled() bool { + mu.Lock() + defer mu.Unlock() + return enable +} + +// Enable can be called to enable keepalives +func Enable() { + mu.Lock() + defer mu.Unlock() + enable = true +} + +// Disable can be called to disable keepalive +func Disable() { + mu.Lock() + defer mu.Unlock() + enable = false +} diff --git a/transport/http2_client.go b/transport/http2_client.go index e691e4be4..cc516b0e5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -101,7 +101,7 @@ type http2Client struct { creds []credentials.PerRPCCredentials // activity counter - activity *uint64 + activity uint64 // accessed atomically // keepalive parameters keepaliveParams keepalive.Params @@ -218,7 +218,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, keepaliveParams: kp, - activity: new(uint64), } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it @@ -704,7 +703,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } } // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) if !opts.Last { return nil } @@ -846,7 +845,7 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { copy(pingAck.data[:], f.Data[:]) t.controlBuf.put(pingAck) // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { @@ -994,7 +993,7 @@ func (t *http2Client) reader() { for { frame, err := t.framer.readFrame() // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1072,16 +1071,14 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { // Activity value seen by timer - ta := atomic.LoadUint64(t.activity) - timer := time.NewTimer(t.keepaliveParams.Ktime) - keepalive.Mu.Lock() - if !keepalive.Enabled { + ta := atomic.LoadUint64(&t.activity) + timer := time.NewTimer(t.keepaliveParams.Time) + if !keepalive.Enabled() { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } } - keepalive.Mu.Unlock() isPingSent := false keepalivePing := &ping{data: [8]byte{}} for { @@ -1119,16 +1116,16 @@ func (t *http2Client) controller() { ns := len(t.activeStreams) t.mu.Unlock() // Global activity value. - ga := atomic.LoadUint64(t.activity) - if ga > ta || (!t.keepaliveParams.KNoStream && ns < 1) { - timer.Reset(t.keepaliveParams.Ktime) + ga := atomic.LoadUint64(&t.activity) + if ga > ta || (!t.keepaliveParams.PermitNoStream && ns < 1) { + timer.Reset(t.keepaliveParams.Time) isPingSent = false } else { if !isPingSent { // send ping t.controlBuf.put(keepalivePing) isPingSent = true - timer.Reset(t.keepaliveParams.Ktimeout) + timer.Reset(t.keepaliveParams.Timeout) } else { t.Close() continue diff --git a/transport/transport_test.go b/transport/transport_test.go index 6e8be33b5..2f6779646 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -305,14 +305,13 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: true, // run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: true, // run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -331,14 +330,13 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: false, // don't run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -357,14 +355,13 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: false, // don't run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -388,13 +385,12 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: true, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: true, // don't run keepalive even with no RPCs }}) defer s.stop() defer tr.Close()