mirror of https://github.com/grpc/grpc-go.git
grpc: ensure transports are closed when the channel enters IDLE (#6620)
This commit is contained in:
parent
552525e56b
commit
2d1bb21e4d
|
@ -1091,8 +1091,8 @@ func (ac *addrConn) updateAddrs(addrs []resolver.Address) {
|
|||
ac.cancel()
|
||||
ac.ctx, ac.cancel = context.WithCancel(ac.cc.ctx)
|
||||
|
||||
// We have to defer here because GracefulClose => Close => onClose, which
|
||||
// requires locking ac.mu.
|
||||
// We have to defer here because GracefulClose => onClose, which requires
|
||||
// locking ac.mu.
|
||||
if ac.transport != nil {
|
||||
defer ac.transport.GracefulClose()
|
||||
ac.transport = nil
|
||||
|
@ -1680,16 +1680,7 @@ func (ac *addrConn) tearDown(err error) {
|
|||
ac.updateConnectivityState(connectivity.Shutdown, nil)
|
||||
ac.cancel()
|
||||
ac.curAddr = resolver.Address{}
|
||||
if err == errConnDrain && curTr != nil {
|
||||
// GracefulClose(...) may be executed multiple times when
|
||||
// i) receiving multiple GoAway frames from the server; or
|
||||
// ii) there are concurrent name resolver/Balancer triggered
|
||||
// address removal and GoAway.
|
||||
// We have to unlock and re-lock here because GracefulClose => Close => onClose, which requires locking ac.mu.
|
||||
ac.mu.Unlock()
|
||||
curTr.GracefulClose()
|
||||
ac.mu.Lock()
|
||||
}
|
||||
|
||||
channelz.AddTraceEvent(logger, ac.channelzID, 0, &channelz.TraceEventDesc{
|
||||
Desc: "Subchannel deleted",
|
||||
Severity: channelz.CtInfo,
|
||||
|
@ -1703,6 +1694,29 @@ func (ac *addrConn) tearDown(err error) {
|
|||
// being deleted right away.
|
||||
channelz.RemoveEntry(ac.channelzID)
|
||||
ac.mu.Unlock()
|
||||
|
||||
// We have to release the lock before the call to GracefulClose/Close here
|
||||
// because both of them call onClose(), which requires locking ac.mu.
|
||||
if curTr != nil {
|
||||
if err == errConnDrain {
|
||||
// Close the transport gracefully when the subConn is being shutdown.
|
||||
//
|
||||
// GracefulClose() may be executed multiple times if:
|
||||
// - multiple GoAway frames are received from the server
|
||||
// - there are concurrent name resolver or balancer triggered
|
||||
// address removal and GoAway
|
||||
curTr.GracefulClose()
|
||||
} else {
|
||||
// Hard close the transport when the channel is entering idle or is
|
||||
// being shutdown. In the case where the channel is being shutdown,
|
||||
// closing of transports is also taken care of by cancelation of cc.ctx.
|
||||
// But in the case where the channel is entering idle, we need to
|
||||
// explicitly close the transports here. Instead of distinguishing
|
||||
// between these two cases, it is simpler to close the transport
|
||||
// unconditionally here.
|
||||
curTr.Close(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (ac *addrConn) getState() connectivity.State {
|
||||
|
|
|
@ -142,7 +142,8 @@ func (s) TestChannelIdleness_Disabled_NoActivity(t *testing.T) {
|
|||
}
|
||||
|
||||
// Tests the case where channel idleness is enabled by passing a small value for
|
||||
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE.
|
||||
// idle_timeout. Verifies that a READY channel with no RPCs moves to IDLE, and
|
||||
// the connection to the backend is closed.
|
||||
func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
||||
// Create a ClientConn with a short idle_timeout.
|
||||
r := manual.NewBuilderWithScheme("whatever")
|
||||
|
@ -159,7 +160,8 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
|||
t.Cleanup(func() { cc.Close() })
|
||||
|
||||
// Start a test backend and push an address update via the resolver.
|
||||
backend := stubserver.StartTestService(t, nil)
|
||||
lis := testutils.NewListenerWrapper(t, nil)
|
||||
backend := stubserver.StartTestService(t, &stubserver.StubServer{Listener: lis})
|
||||
t.Cleanup(backend.Stop)
|
||||
r.UpdateState(resolver.State{Addresses: []resolver.Address{{Addr: backend.Address}}})
|
||||
|
||||
|
@ -168,6 +170,13 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
|||
defer cancel()
|
||||
testutils.AwaitState(ctx, t, cc, connectivity.Ready)
|
||||
|
||||
// Retrieve the wrapped conn from the listener.
|
||||
v, err := lis.NewConnCh.Receive(ctx)
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to retrieve conn from test listener: %v", err)
|
||||
}
|
||||
conn := v.(*testutils.ConnWrapper)
|
||||
|
||||
// Verify that the ClientConn moves to IDLE as there is no activity.
|
||||
testutils.AwaitState(ctx, t, cc, connectivity.Idle)
|
||||
|
||||
|
@ -175,6 +184,11 @@ func (s) TestChannelIdleness_Enabled_NoActivity(t *testing.T) {
|
|||
if err := channelzTraceEventFound(ctx, "entering idle mode"); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
// Verify that the previously open connection is closed.
|
||||
if _, err := conn.CloseCh.Receive(ctx); err != nil {
|
||||
t.Fatalf("Failed when waiting for connection to be closed after channel entered IDLE: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
// Tests the case where channel idleness is enabled by passing a small value for
|
||||
|
|
Loading…
Reference in New Issue