all: replace RemoveSubConn with Shutdown as much as possible (#6505)

This commit is contained in:
Doug Fawley 2023-08-04 10:19:51 -07:00 committed by GitHub
parent 28ac6efee6
commit 6c0c69efd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 297 additions and 290 deletions

View File

@ -367,8 +367,9 @@ type Balancer interface {
// Deprecated: Use NewSubConnOptions.StateListener when creating the
// SubConn instead.
UpdateSubConnState(SubConn, SubConnState)
// Close closes the balancer. The balancer is not required to call
// ClientConn.RemoveSubConn for its existing SubConns.
// Close closes the balancer. The balancer is not currently required to
// call SubConn.Shutdown for its existing SubConns; however, this will be
// required in a future release, so it is recommended.
Close()
}

View File

@ -121,7 +121,7 @@ func (b *baseBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
sc := sci.(balancer.SubConn)
// a was removed by resolver.
if _, ok := addrsSet.Get(a); !ok {
b.cc.RemoveSubConn(sc)
sc.Shutdown()
b.subConns.Delete(a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
@ -204,8 +204,8 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
case connectivity.TransientFailure:
// Save error to be reported via picker.
@ -226,7 +226,7 @@ func (b *baseBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.Su
}
// Close is a nop because base balancer doesn't have internal state to clean up,
// and it doesn't need to call RemoveSubConn for the SubConns.
// and it doesn't need to call Shutdown for the SubConns.
func (b *baseBalancer) Close() {
}

View File

@ -213,7 +213,7 @@ type lbBalancer struct {
backendAddrsWithoutMetadata []resolver.Address
// Roundrobin functionalities.
state connectivity.State
subConns map[resolver.Address]balancer.SubConn // Used to new/remove SubConn.
subConns map[resolver.Address]balancer.SubConn // Used to new/shutdown SubConn.
scStates map[balancer.SubConn]connectivity.State // Used to filter READY SubConns.
picker balancer.Picker
// Support fallback to resolved backend addresses if there's no response
@ -290,7 +290,7 @@ func (lb *lbBalancer) regeneratePicker(resetDrop bool) {
// aggregateSubConnStats calculate the aggregated state of SubConns in
// lb.SubConns. These SubConns are subconns in use (when switching between
// fallback and grpclb). lb.scState contains states for all SubConns, including
// those in cache (SubConns are cached for 10 seconds after remove).
// those in cache (SubConns are cached for 10 seconds after shutdown).
//
// The aggregated state is:
// - If at least one SubConn in Ready, the aggregated state is Ready;
@ -345,8 +345,8 @@ func (lb *lbBalancer) updateSubConnState(sc balancer.SubConn, scs balancer.SubCo
case connectivity.Idle:
sc.Connect()
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(lb.scStates, sc)
case connectivity.TransientFailure:
lb.connErr = scs.ConnectionError

View File

@ -113,7 +113,6 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
balancingPolicyChanged := lb.usePickFirst != pickFirst
oldUsePickFirst := lb.usePickFirst
lb.usePickFirst = pickFirst
if fallbackModeChanged || balancingPolicyChanged {
@ -123,13 +122,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
// For fallback mode switching with pickfirst, we want to recreate the
// SubConn because the creds could be different.
for a, sc := range lb.subConns {
if oldUsePickFirst {
// If old SubConn were created for pickfirst, bypass cache and
// remove directly.
lb.cc.ClientConn.RemoveSubConn(sc)
} else {
lb.cc.RemoveSubConn(sc)
}
sc.Shutdown()
delete(lb.subConns, a)
}
}
@ -144,7 +137,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
}
if sc != nil {
if len(backendAddrs) == 0 {
lb.cc.ClientConn.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, scKey)
return
}
@ -197,7 +190,7 @@ func (lb *lbBalancer) refreshSubConns(backendAddrs []resolver.Address, fallback
for a, sc := range lb.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
lb.cc.RemoveSubConn(sc)
sc.Shutdown()
delete(lb.subConns, a)
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.

View File

@ -91,7 +91,7 @@ func (r *lbManualResolver) UpdateState(s resolver.State) {
const subConnCacheTime = time.Second * 10
// lbCacheClientConn is a wrapper balancer.ClientConn with a SubConn cache.
// SubConns will be kept in cache for subConnCacheTime before being removed.
// SubConns will be kept in cache for subConnCacheTime before being shut down.
//
// Its NewSubconn and SubConn.Shutdown methods are updated to do cache first.
type lbCacheClientConn struct {
@ -149,7 +149,7 @@ func (ccc *lbCacheClientConn) NewSubConn(addrs []resolver.Address, opts balancer
}
func (ccc *lbCacheClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}
type lbCacheSubConn struct {
@ -168,9 +168,9 @@ func (sc *lbCacheSubConn) Shutdown() {
if entry, ok := ccc.subConnCache[addr]; ok {
if entry.sc != sc {
// This could happen if NewSubConn was called multiple times for the
// same address, and those SubConns are all removed. We remove sc
// immediately here.
// This could happen if NewSubConn was called multiple times for
// the same address, and those SubConns are all shut down. We
// remove sc immediately here.
delete(ccc.subConnToAddr, sc)
sc.SubConn.Shutdown()
}
@ -214,7 +214,7 @@ func (ccc *lbCacheClientConn) UpdateState(s balancer.State) {
func (ccc *lbCacheClientConn) close() {
ccc.mu.Lock()
defer ccc.mu.Unlock()
// Only cancel all existing timers. There's no need to remove SubConns.
// Only cancel all existing timers. There's no need to shut down SubConns.
for _, entry := range ccc.subConnCache {
entry.cancel()
}

View File

@ -61,7 +61,7 @@ func (mcc *mockClientConn) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}
func (mcc *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
panic(fmt.Sprintf("RemoveSubConn(%v) called unexpectedly", sc))
}
const testCacheTimeout = 100 * time.Millisecond
@ -87,7 +87,7 @@ func checkCacheCC(ccc *lbCacheClientConn, sccLen, sctaLen int) error {
return nil
}
// Test that SubConn won't be immediately removed.
// Test that SubConn won't be immediately shut down.
func (s) TestLBCacheClientConnExpire(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
@ -110,7 +110,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
@ -138,7 +138,7 @@ func (s) TestLBCacheClientConnExpire(t *testing.T) {
}
}
// Test that NewSubConn with the same address of a SubConn being removed will
// Test that NewSubConn with the same address of a SubConn being shut down will
// reuse the SubConn and cancel the removing.
func (s) TestLBCacheClientConnReuse(t *testing.T) {
mcc := newMockClientConn()
@ -162,7 +162,7 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}
ccc.RemoveSubConn(sc)
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
@ -195,8 +195,8 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
t.Fatal(err)
}
// Call remove again, will delete after timeout.
ccc.RemoveSubConn(sc)
// Call Shutdown again, will delete after timeout.
sc.Shutdown()
// One subconn in MockCC before timeout.
if err := checkMockCC(mcc, 1); err != nil {
t.Fatal(err)
@ -223,9 +223,9 @@ func (s) TestLBCacheClientConnReuse(t *testing.T) {
}
}
// Test that if the timer to remove a SubConn fires at the same time NewSubConn
// cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {
// Test that if the timer to shut down a SubConn fires at the same time
// NewSubConn cancels the timer, it doesn't cause deadlock.
func (s) TestLBCache_ShutdownTimer_New_Race(t *testing.T) {
mcc := newMockClientConn()
if err := checkMockCC(mcc, 0); err != nil {
t.Fatal(err)
@ -251,9 +251,9 @@ func (s) TestLBCache_RemoveTimer_New_Race(t *testing.T) {
go func() {
for i := 0; i < 1000; i++ {
// Remove starts a timer with 1 ns timeout, the NewSubConn will race
// with with the timer.
ccc.RemoveSubConn(sc)
// Shutdown starts a timer with 1 ns timeout, the NewSubConn will
// race with with the timer.
sc.Shutdown()
sc, _ = ccc.NewSubConn([]resolver.Address{{Addr: "address1"}}, balancer.NewSubConnOptions{})
}
close(done)

View File

@ -187,7 +187,7 @@ func (b *wrrBalancer) updateAddresses(addrs []resolver.Address) {
// addr was removed by resolver. Remove.
wsci, _ := b.subConns.Get(addr)
wsc := wsci.(*weightedSubConn)
b.cc.RemoveSubConn(wsc.SubConn)
wsc.SubConn.Shutdown()
b.subConns.Delete(addr)
}
}

View File

@ -241,12 +241,12 @@ func (s) TestWeightedTarget(t *testing.T) {
// attribute set to the config that was passed to it.
verifyAddressInNewSubConn(t, cc, setConfigKey(addr2, "cluster_2"))
// The subconn for cluster_1 should be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn for cluster_1 should be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
sc2 := <-cc.NewSubConnCh
sc2.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
@ -286,12 +286,12 @@ func (s) TestWeightedTarget(t *testing.T) {
}
verifyAddressInNewSubConn(t, cc, addr3)
// The subconn from the test_config_balancer should be removed.
scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// The subconn from the test_config_balancer should be shut down.
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
// Send subconn state change.
sc3 := <-cc.NewSubConnCh
@ -409,12 +409,12 @@ func (s) TestWeightedTarget_OneSubBalancer_AddRemoveBackend(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Expect one SubConn to be removed.
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
// Expect one SubConn to be shut down.
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
// Test pick with only the second SubConn.
@ -579,7 +579,7 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
t.Fatalf("want %v, got %v", want, err)
}
// Remove subConn corresponding to addr3.
// Shut down subConn corresponding to addr3.
if err := wtb.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
hierarchy.Set(addr1, []string{"cluster_1"}),
@ -590,11 +590,11 @@ func (s) TestWeightedTarget_TwoSubBalancers_MoreBackends(t *testing.T) {
}); err != nil {
t.Fatalf("failed to update ClientConn state: %v", err)
}
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc3 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc3, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc3 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc3, scShutdown)
}
scRemoved.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
p = <-cc.NewPickerCh
want = []balancer.SubConn{sc1, sc4}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
@ -823,9 +823,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// picker which ensures that the removed subBalancer is not picked for RPCs.
p = <-cc.NewPickerCh
scRemoved := <-cc.RemoveSubConnCh
if scRemoved != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scRemoved)
scShutdown := <-cc.ShutdownSubConnCh
if scShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scShutdown)
}
want = []balancer.SubConn{sc1, sc3}
if err := testutils.IsRoundRobin(want, testutils.SubConnFromPicker(p)); err != nil {
@ -865,9 +865,9 @@ func (s) TestWeightedTarget_ThreeSubBalancers_RemoveBalancer(t *testing.T) {
// Removing a subBalancer causes the weighted target LB policy to push a new
// picker which ensures that the removed subBalancer is not picked for RPCs.
scRemoved = <-cc.RemoveSubConnCh
if scRemoved != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scRemoved)
scShutdown = <-cc.ShutdownSubConnCh
if scShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scShutdown)
}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)

View File

@ -311,23 +311,8 @@ func (ccb *ccBalancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer
}
func (ccb *ccBalancerWrapper) RemoveSubConn(sc balancer.SubConn) {
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}
acbw, ok := sc.(*acBalancerWrapper)
if !ok {
return
}
ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
// The graceful switch balancer will never call this.
logger.Errorf("ccb RemoveSubConn(%v) called unexpectedly, sc")
}
func (ccb *ccBalancerWrapper) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {
@ -392,7 +377,20 @@ func (acbw *acBalancerWrapper) Connect() {
}
func (acbw *acBalancerWrapper) Shutdown() {
acbw.ccb.RemoveSubConn(acbw)
ccb := acbw.ccb
if ccb.isIdleOrClosed() {
// It it safe to ignore this call when the balancer is closed or in idle
// because the ClientConn takes care of closing the connections.
//
// Not returning early from here when the balancer is closed or in idle
// leads to a deadlock though, because of the following sequence of
// calls when holding cc.mu:
// cc.exitIdleMode --> ccb.enterIdleMode --> gsw.Close -->
// ccb.RemoveAddrConn --> cc.removeAddrConn
return
}
ccb.cc.removeAddrConn(acbw.ac, errConnDrain)
}
// NewStream begins a streaming RPC on the addrConn. If the addrConn is not

View File

@ -255,7 +255,7 @@ func (gsb *Balancer) Close() {
//
// It implements the balancer.ClientConn interface and is passed down in that
// capacity to the wrapped balancer. It maintains a set of subConns created by
// the wrapped balancer and calls from the latter to create/update/remove
// the wrapped balancer and calls from the latter to create/update/shutdown
// SubConns update this set before being forwarded to the parent ClientConn.
// State updates from the wrapped balancer can result in invocation of the
// graceful switch logic.
@ -267,9 +267,10 @@ type balancerWrapper struct {
subconns map[balancer.SubConn]bool // subconns created by this balancer
}
// Close closes the underlying LB policy and removes the subconns it created. bw
// must not be referenced via balancerCurrent or balancerPending in gsb when
// called. gsb.mu must not be held. Does not panic with a nil receiver.
// Close closes the underlying LB policy and shuts down the subconns it
// created. bw must not be referenced via balancerCurrent or balancerPending in
// gsb when called. gsb.mu must not be held. Does not panic with a nil
// receiver.
func (bw *balancerWrapper) Close() {
// before Close is called.
if bw == nil {
@ -282,7 +283,7 @@ func (bw *balancerWrapper) Close() {
bw.Balancer.Close()
bw.gsb.mu.Lock()
for sc := range bw.subconns {
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
}
bw.gsb.mu.Unlock()
}
@ -345,7 +346,7 @@ func (bw *balancerWrapper) NewSubConn(addrs []resolver.Address, opts balancer.Ne
}
bw.gsb.mu.Lock()
if !bw.gsb.balancerCurrentOrPending(bw) { // balancer was closed during this call
bw.gsb.cc.RemoveSubConn(sc)
sc.Shutdown()
bw.gsb.mu.Unlock()
return nil, fmt.Errorf("%T at address %p that called NewSubConn is deleted", bw, bw)
}
@ -364,6 +365,8 @@ func (bw *balancerWrapper) ResolveNow(opts resolver.ResolveNowOptions) {
}
func (bw *balancerWrapper) RemoveSubConn(sc balancer.SubConn) {
// Note: existing third party balancers may call this, so it must remain
// until RemoveSubConn is fully removed.
sc.Shutdown()
}

View File

@ -414,32 +414,32 @@ func (s) TestBalancerSubconns(t *testing.T) {
}
// balancerCurrent removing sc1 should get forwarded to the ClientConn.
gsb.balancerCurrent.Balancer.(*mockBalancer).removeSubConn(sc1)
sc1.Shutdown()
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn")
case sc := <-tcc.RemoveSubConnCh:
case sc := <-tcc.ShutdownSubConnCh:
if sc != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc)
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, sc)
}
}
// balancerPending removing sc2 should get forwarded to the ClientConn.
gsb.balancerPending.Balancer.(*mockBalancer).removeSubConn(sc2)
sc2.Shutdown()
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn")
case sc := <-tcc.RemoveSubConnCh:
case sc := <-tcc.ShutdownSubConnCh:
if sc != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, sc)
t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, sc)
}
}
}
// TestBalancerClose tests the graceful switch balancer's Close() functionality.
// From the Close() call, the graceful switch balancer should remove any created
// Subconns and Close() the current and pending load balancers. This Close()
// call should also cause any other events (calls to entrance functions) to be
// no-ops.
// TestBalancerClose tests the graceful switch balancer's Close()
// functionality. From the Close() call, the graceful switch balancer should
// shut down any created Subconns and Close() the current and pending load
// balancers. This Close() call should also cause any other events (calls to
// entrance functions) to be no-ops.
func (s) TestBalancerClose(t *testing.T) {
// Setup gsb balancer with current, pending, and one created SubConn on both
// current and pending.
@ -479,25 +479,25 @@ func (s) TestBalancerClose(t *testing.T) {
gsb.Close()
// The order of SubConns the graceful switch load balancer tells the Client
// Conn to remove is non deterministic, as it is stored in a map. However,
// the first SubConn removed should be either sc1 or sc2.
// Conn to shut down is non deterministic, as it is stored in a
// map. However, the first SubConn shut down should be either sc1 or sc2.
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn")
case sc := <-tcc.RemoveSubConnCh:
case sc := <-tcc.ShutdownSubConnCh:
if sc != sc1 && sc != sc2 {
t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc)
t.Fatalf("ShutdownSubConn, want either %v or %v, got %v", sc1, sc2, sc)
}
}
// The graceful switch load balancer should then tell the ClientConn to
// remove the other SubConn.
// shut down the other SubConn.
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an UpdateAddresses call on the ClientConn")
case sc := <-tcc.RemoveSubConnCh:
case sc := <-tcc.ShutdownSubConnCh:
if sc != sc1 && sc != sc2 {
t.Fatalf("RemoveSubConn, want either %v or %v, got %v", sc1, sc2, sc)
t.Fatalf("ShutdownSubConn, want either %v or %v, got %v", sc1, sc2, sc)
}
}
@ -615,16 +615,16 @@ func (s) TestPendingReplacedByAnotherPending(t *testing.T) {
// Replace pending with a SwitchTo() call.
gsb.SwitchTo(mockBalancerBuilder2{})
// The pending balancer being replaced should cause the graceful switch
// balancer to Remove() any created SubConns for the old pending balancer
// balancer to Shutdown() any created SubConns for the old pending balancer
// and also Close() the old pending balancer.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for a RemoveSubConn call on the ClientConn")
case sc := <-tcc.RemoveSubConnCh:
t.Fatalf("timeout while waiting for a SubConn.Shutdown")
case sc := <-tcc.ShutdownSubConnCh:
if sc != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, sc)
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, sc)
}
}
@ -750,8 +750,8 @@ func (s) TestInlineCallbackInBuild(t *testing.T) {
}
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an RemoveSubConn() call on the ClientConn")
case <-tcc.RemoveSubConnCh:
t.Fatalf("timeout while waiting for a Shutdown() call on the SubConn")
case <-tcc.ShutdownSubConnCh:
}
oldCurrent := gsb.balancerCurrent.Balancer.(*buildCallbackBal)
@ -775,8 +775,8 @@ func (s) TestInlineCallbackInBuild(t *testing.T) {
}
select {
case <-ctx.Done():
t.Fatalf("timeout while waiting for an RemoveSubConn() call on the ClientConn")
case <-tcc.RemoveSubConnCh:
t.Fatalf("timeout while waiting for a Shutdown() call on the SubConn")
case <-tcc.ShutdownSubConnCh:
}
// The current balancer should be closed as a result of the swap.
@ -965,10 +965,6 @@ func (mb1 *mockBalancer) updateAddresses(sc balancer.SubConn, addrs []resolver.A
mb1.cc.UpdateAddresses(sc, addrs)
}
func (mb1 *mockBalancer) removeSubConn(sc balancer.SubConn) {
mb1.cc.RemoveSubConn(sc)
}
type mockBalancerBuilder2 struct{}
func (mockBalancerBuilder2) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
@ -1047,7 +1043,7 @@ func (buildCallbackBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.
return nil
}
b.updateAddresses(sc, []resolver.Address{})
b.removeSubConn(sc)
sc.Shutdown()
return b
}
@ -1094,10 +1090,6 @@ func (bcb *buildCallbackBal) updateAddresses(sc balancer.SubConn, addrs []resolv
bcb.cc.UpdateAddresses(sc, addrs)
}
func (bcb *buildCallbackBal) removeSubConn(sc balancer.SubConn) {
bcb.cc.RemoveSubConn(sc)
}
// waitForClose verifies that the mockBalancer is closed before the context
// expires.
func (bcb *buildCallbackBal) waitForClose(ctx context.Context) error {

View File

@ -553,7 +553,7 @@ func (bg *BalancerGroup) Close() {
bg.incomingStarted = false
// Also remove all SubConns.
for sc := range bg.scToSubBalancer {
bg.cc.RemoveSubConn(sc)
sc.Shutdown()
delete(bg.scToSubBalancer, sc)
}
}

View File

@ -116,7 +116,7 @@ func (s) TestBalancerGroup_start_close(t *testing.T) {
gator.Stop()
bg.Close()
for i := 0; i < 4; i++ {
(<-cc.RemoveSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
(<-cc.ShutdownSubConnCh).UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
}
// Add b3, weight 1, backends [1,2].
@ -244,8 +244,8 @@ func initBalancerGroupForCachingTest(t *testing.T) (*weightedaggregator.Aggregat
// removed after close timeout.
for i := 0; i < 10; i++ {
select {
case <-cc.RemoveSubConnCh:
t.Fatalf("Got request to remove subconn, want no remove subconn (because subconns were still in cache)")
case <-cc.ShutdownSubConnCh:
t.Fatalf("Got request to shut down subconn, want no shut down subconn (because subconns were still in cache)")
default:
}
time.Sleep(time.Millisecond)
@ -310,7 +310,7 @@ func (s) TestBalancerGroup_locality_caching(t *testing.T) {
}
}
// Sub-balancers are put in cache when they are removed. If balancer group is
// Sub-balancers are put in cache when they are shut down. If balancer group is
// closed within close timeout, all subconns should still be rmeoved
// immediately.
func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
@ -318,51 +318,51 @@ func (s) TestBalancerGroup_locality_caching_close_group(t *testing.T) {
_, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
bg.Close()
// The balancer group is closed. The subconns should be removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
// The balancer group is closed. The subconns should be shutdown immediately.
shutdownTimeout := time.After(time.Millisecond * 500)
scToShutdown := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[0]]: 1,
addrToSC[testBackendAddrs[1]]: 1,
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
for i := 0; i < len(scToShutdown); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
case sc := <-cc.ShutdownSubConnCh:
c := scToShutdown[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
scToShutdown[sc] = c - 1
case <-shutdownTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
}
}
}
// Sub-balancers in cache will be closed if not re-added within timeout, and
// subConns will be removed.
// subConns will be shut down.
func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(time.Second)()
_, _, cc, addrToSC := initBalancerGroupForCachingTest(t)
// The sub-balancer is not re-added within timeout. The subconns should be
// removed.
removeTimeout := time.After(DefaultSubBalancerCloseTimeout)
scToRemove := map[balancer.SubConn]int{
// shut down.
shutdownTimeout := time.After(DefaultSubBalancerCloseTimeout)
scToShutdown := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
for i := 0; i < len(scToShutdown); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
case sc := <-cc.ShutdownSubConnCh:
c := scToShutdown[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
scToShutdown[sc] = c - 1
case <-shutdownTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
}
}
}
@ -381,35 +381,35 @@ func (*noopBalancerBuilderWrapper) Name() string {
}
// After removing a sub-balancer, re-add with same ID, but different balancer
// builder. Old subconns should be removed, and new subconns should be created.
// builder. Old subconns should be shut down, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {
defer replaceDefaultSubBalancerCloseTimeout(10 * time.Second)()
gator, bg, cc, addrToSC := initBalancerGroupForCachingTest(t)
// Re-add sub-balancer-1, but with a different balancer builder. The
// sub-balancer was still in cache, but cann't be reused. This should cause
// old sub-balancer's subconns to be removed immediately, and new subconns
// to be created.
// old sub-balancer's subconns to be shut down immediately, and new
// subconns to be created.
gator.Add(testBalancerIDs[1], 1)
bg.Add(testBalancerIDs[1], &noopBalancerBuilderWrapper{rrBuilder})
// The cached sub-balancer should be closed, and the subconns should be
// removed immediately.
removeTimeout := time.After(time.Millisecond * 500)
scToRemove := map[balancer.SubConn]int{
// shut down immediately.
shutdownTimeout := time.After(time.Millisecond * 500)
scToShutdown := map[balancer.SubConn]int{
addrToSC[testBackendAddrs[2]]: 1,
addrToSC[testBackendAddrs[3]]: 1,
}
for i := 0; i < len(scToRemove); i++ {
for i := 0; i < len(scToShutdown); i++ {
select {
case sc := <-cc.RemoveSubConnCh:
c := scToRemove[sc]
case sc := <-cc.ShutdownSubConnCh:
c := scToShutdown[sc]
if c == 0 {
t.Fatalf("Got removeSubConn for %v when there's %d remove expected", sc, c)
t.Fatalf("Got Shutdown for %v when there's %d shutdown expected", sc, c)
}
scToRemove[sc] = c - 1
case <-removeTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be removed")
scToShutdown[sc] = c - 1
case <-shutdownTimeout:
t.Fatalf("timeout waiting for subConns (from balancer in cache) to be shut down")
}
}
@ -630,15 +630,15 @@ func (s) TestBalancerGracefulSwitch(t *testing.T) {
for i := 0; i < 2; i++ {
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case sc := <-cc.RemoveSubConnCh:
// The SubConn removed should have been one of the two created
t.Fatalf("error waiting for Shutdown()")
case sc := <-cc.ShutdownSubConnCh:
// The SubConn shut down should have been one of the two created
// SubConns, and both should be deleted.
if ok := scs[sc]; ok {
delete(scs, sc)
continue
} else {
t.Fatalf("RemoveSubConn called for wrong SubConn %v, want in %v", sc, scs)
t.Fatalf("Shutdown called for wrong SubConn %v, want in %v", sc, scs)
}
}
}

View File

@ -34,6 +34,7 @@ import (
type testingLogger interface {
Log(args ...interface{})
Logf(format string, args ...interface{})
Errorf(format string, args ...interface{})
}
// TestSubConn implements the SubConn interface, to be used in tests.
@ -82,12 +83,12 @@ func (tsc *TestSubConn) UpdateState(state balancer.SubConnState) {
}
}
// Shutdown pushes the SubConn to the RemoveSubConn channel in the parent
// Shutdown pushes the SubConn to the ShutdownSubConn channel in the parent
// TestClientConn.
func (tsc *TestSubConn) Shutdown() {
tsc.tcc.logger.Logf("SubConn %s: Shutdown", tsc)
select {
case tsc.tcc.RemoveSubConnCh <- tsc:
case tsc.tcc.ShutdownSubConnCh <- tsc:
default:
}
}
@ -103,7 +104,7 @@ type TestClientConn struct {
NewSubConnAddrsCh chan []resolver.Address // the last 10 []Address to create subconn.
NewSubConnCh chan *TestSubConn // the last 10 subconn created.
RemoveSubConnCh chan *TestSubConn // the last 10 subconn removed.
ShutdownSubConnCh chan *TestSubConn // the last 10 subconn removed.
UpdateAddressesAddrsCh chan []resolver.Address // last updated address via UpdateAddresses().
NewPickerCh chan balancer.Picker // the last picker updated.
@ -120,7 +121,7 @@ func NewTestClientConn(t *testing.T) *TestClientConn {
NewSubConnAddrsCh: make(chan []resolver.Address, 10),
NewSubConnCh: make(chan *TestSubConn, 10),
RemoveSubConnCh: make(chan *TestSubConn, 10),
ShutdownSubConnCh: make(chan *TestSubConn, 10),
UpdateAddressesAddrsCh: make(chan []resolver.Address, 1),
NewPickerCh: make(chan balancer.Picker, 1),
@ -153,9 +154,10 @@ func (tcc *TestClientConn) NewSubConn(a []resolver.Address, o balancer.NewSubCon
return sc, nil
}
// RemoveSubConn removes the SubConn.
// RemoveSubConn is a nop; tests should all be updated to use sc.Shutdown()
// instead.
func (tcc *TestClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.(*TestSubConn).Shutdown()
tcc.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}
// UpdateAddresses updates the addresses on the SubConn.

View File

@ -117,9 +117,9 @@ func (b *pickfirstBalancer) UpdateClientConnState(state balancer.ClientConnState
// The resolver reported an empty address list. Treat it like an error by
// calling b.ResolverError.
if b.subConn != nil {
// Remove the old subConn. All addresses were removed, so it is no longer
// valid.
b.cc.RemoveSubConn(b.subConn)
// Shut down the old subConn. All addresses were removed, so it is
// no longer valid.
b.subConn.Shutdown()
b.subConn = nil
}
b.ResolverError(errors.New("produced zero addresses"))

View File

@ -360,11 +360,11 @@ func (s) TestBalancerSwitch_grpclbNotRegistered(t *testing.T) {
}
}
// TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose tests the scenario
// where the balancer being switched out calls RemoveSubConn() in its Close()
// TestBalancerSwitch_OldBalancerCallsShutdownInClose tests the scenario where
// the balancer being switched out calls Shutdown() in its Close()
// method. Verifies that this sequence of calls doesn't lead to a deadlock.
func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
// Register a stub balancer which calls RemoveSubConn() from its Close().
func (s) TestBalancerSwitch_OldBalancerCallsShutdownInClose(t *testing.T) {
// Register a stub balancer which calls Shutdown() from its Close().
scChan := make(chan balancer.SubConn, 1)
uccsCalled := make(chan struct{}, 1)
stub.Register(t.Name(), stub.BalancerFuncs{
@ -378,7 +378,7 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
return nil
},
Close: func(data *stub.BalancerData) {
data.ClientConn.RemoveSubConn(<-scChan)
(<-scChan).Shutdown()
},
})
@ -406,10 +406,9 @@ func (s) TestBalancerSwitch_OldBalancerCallsRemoveSubConnInClose(t *testing.T) {
// The following service config update will switch balancer from our stub
// balancer to pick_first. The former will be closed, which will call
// cc.RemoveSubConn() inline (this RemoveSubConn is not required by the API,
// but some balancers might do it).
// sc.Shutdown() inline.
//
// This is to make sure the cc.RemoveSubConn() from Close() doesn't cause a
// This is to make sure the sc.Shutdown() from Close() doesn't cause a
// deadlock (e.g. trying to grab a mutex while it's already locked).
//
// Do it in a goroutine so this test will fail with a helpful message

View File

@ -1062,58 +1062,77 @@ func (s) TestBalancerProducerHonorsContext(t *testing.T) {
}
}
// TestSubConnShutdown confirms that the Shutdown method on subconns properly
// initiates their shutdown.
// TestSubConnShutdown confirms that the Shutdown method on subconns and
// RemoveSubConn method on ClientConn properly initiates subconn shutdown.
func (s) TestSubConnShutdown(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
gotShutdown := grpcsync.NewEvent()
testCases := []struct {
name string
shutdown func(cc balancer.ClientConn, sc balancer.SubConn)
}{{
name: "ClientConn.RemoveSubConn",
shutdown: func(cc balancer.ClientConn, sc balancer.SubConn) {
cc.RemoveSubConn(sc)
},
}, {
name: "SubConn.Shutdown",
shutdown: func(_ balancer.ClientConn, sc balancer.SubConn) {
sc.Shutdown()
},
}}
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
switch scs.ConnectivityState {
case connectivity.Connecting:
// Ignored.
case connectivity.Ready:
sc.Shutdown()
case connectivity.Shutdown:
gotShutdown.Fire()
default:
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
gotShutdown := grpcsync.NewEvent()
bf := stub.BalancerFuncs{
UpdateClientConnState: func(bd *stub.BalancerData, ccs balancer.ClientConnState) error {
var sc balancer.SubConn
opts := balancer.NewSubConnOptions{
StateListener: func(scs balancer.SubConnState) {
switch scs.ConnectivityState {
case connectivity.Connecting:
// Ignored.
case connectivity.Ready:
tc.shutdown(bd.ClientConn, sc)
case connectivity.Shutdown:
gotShutdown.Fire()
default:
t.Errorf("got unexpected state %q in listener", scs.ConnectivityState)
}
},
}
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
if err != nil {
return err
}
sc.Connect()
// Report the state as READY to unblock ss.Start(), which waits for ready.
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
return nil
},
}
sc, err := bd.ClientConn.NewSubConn(ccs.ResolverState.Addresses, opts)
if err != nil {
return err
testBalName := "shutdown-test-balancer-" + tc.name
stub.Register(testBalName, bf)
t.Logf("Registered balancer %s...", testBalName)
ss := &stubserver.StubServer{}
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
sc.Connect()
// Report the state as READY to unblock ss.Start(), which waits for ready.
bd.ClientConn.UpdateState(balancer.State{ConnectivityState: connectivity.Ready})
return nil
},
}
defer ss.Stop()
const testBalName = "shutdown-test-balancer"
stub.Register(testBalName, bf)
t.Logf("Registered balancer %s...", testBalName)
ss := &stubserver.StubServer{}
if err := ss.Start(nil, grpc.WithDefaultServiceConfig(
fmt.Sprintf(`{ "loadBalancingConfig": [{"%v": {}}] }`, testBalName),
)); err != nil {
t.Fatalf("Error starting endpoint server: %v", err)
}
defer ss.Stop()
select {
case <-gotShutdown.Done():
// Success
case <-ctx.Done():
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
select {
case <-gotShutdown.Done():
// Success
case <-ctx.Done():
t.Fatalf("Timed out waiting for gotShutdown to be fired.")
}
})
}
}

View File

@ -403,7 +403,7 @@ func (b *clusterImplBalancer) NewSubConn(addrs []resolver.Address, opts balancer
}
func (b *clusterImplBalancer) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}
func (b *clusterImplBalancer) UpdateAddresses(sc balancer.SubConn, addrs []resolver.Address) {

View File

@ -453,7 +453,7 @@ func TestRoutingConfigUpdateDeleteAll(t *testing.T) {
select {
case <-time.After(time.Millisecond * 500):
t.Fatalf("timeout waiting for remove subconn")
case <-cc.RemoveSubConnCh:
case <-cc.ShutdownSubConnCh:
}
}
@ -729,12 +729,12 @@ func TestClusterGracefulSwitch(t *testing.T) {
defer cancel()
select {
case <-ctx.Done():
t.Fatalf("error waiting for RemoveSubConn()")
case rsc := <-cc.RemoveSubConnCh:
t.Fatalf("error waiting for sc.Shutdown()")
case rsc := <-cc.ShutdownSubConnCh:
// The SubConn removed should have been the created SubConn
// from the child before switching.
if rsc != sc1 {
t.Fatalf("RemoveSubConn() got: %v, want %v", rsc, sc1)
t.Fatalf("Shutdown() got: %v, want %v", rsc, sc1)
}
}
}

View File

@ -502,7 +502,7 @@ func (b *outlierDetectionBalancer) NewSubConn(addrs []resolver.Address, opts bal
}
func (b *outlierDetectionBalancer) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
b.logger.Errorf("RemoveSubConn(%v) called unexpectedly", sc)
}
// appendIfPresent appends the scw to the address, if the address is present in

View File

@ -1538,7 +1538,7 @@ func (s) TestConcurrentOperations(t *testing.T) {
wg.Add(1)
go func() {
defer wg.Done()
od.RemoveSubConn(scw1)
scw1.Shutdown()
}()
wg.Add(1)

View File

@ -143,8 +143,8 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) {
select {
case sc := <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn: %s", sc)
case sc := <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn: %v", sc)
case sc := <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn: %v", sc)
case <-time.After(time.Millisecond * 100):
}
@ -175,8 +175,8 @@ func (s) TestPriority_HighPriorityReady(t *testing.T) {
select {
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn")
case <-time.After(time.Millisecond * 100):
}
@ -279,8 +279,8 @@ func (s) TestPriority_SwitchPriority(t *testing.T) {
select {
case sc := <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn, %s", sc)
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn")
case <-time.After(time.Millisecond * 100):
}
@ -325,10 +325,10 @@ func (s) TestPriority_SwitchPriority(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// p2 SubConns are removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc2 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc2, scToRemove)
// p2 SubConns are shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc2 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc2, scToShutdown)
}
// Should get an update with 1's old transient failure picker, to override
@ -423,10 +423,10 @@ func (s) TestPriority_HighPriorityToConnectingFromReady(t *testing.T) {
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Connecting})
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// p1 subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove)
// p1 subconn should be shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown)
}
if err := cc.WaitForRoundRobinPicker(ctx, sc0); err != nil {
@ -612,13 +612,13 @@ func (s) TestPriority_HigherReadyCloseAllLower(t *testing.T) {
// When 0 becomes ready, 0 should be used, 1 and 2 should all be closed.
sc0.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Ready})
// sc1 and sc2 should be removed.
// sc1 and sc2 should be shut down.
//
// With localities caching, the lower priorities are closed after a timeout,
// in goroutines. The order is no longer guaranteed.
scToRemove := []balancer.SubConn{<-cc.RemoveSubConnCh, <-cc.RemoveSubConnCh}
if !(scToRemove[0] == sc1 && scToRemove[1] == sc2) && !(scToRemove[0] == sc2 && scToRemove[1] == sc1) {
t.Errorf("RemoveSubConn, want [%v, %v], got %v", sc1, sc2, scToRemove)
scToShutdown := []balancer.SubConn{<-cc.ShutdownSubConnCh, <-cc.ShutdownSubConnCh}
if !(scToShutdown[0] == sc1 && scToShutdown[1] == sc2) && !(scToShutdown[0] == sc2 && scToShutdown[1] == sc1) {
t.Errorf("ShutdownSubConn, want [%v, %v], got %v", sc1, sc2, scToShutdown)
}
// Test pick with 0.
@ -765,10 +765,10 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// p0 subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc0 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove)
// p0 subconn should be shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc0 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown)
}
// Test pick return TransientFailure.
@ -836,10 +836,10 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// p1 subconn should be removed.
scToRemove1 := <-cc.RemoveSubConnCh
if scToRemove1 != sc11 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc11, scToRemove1)
// p1 subconn should be shut down.
scToShutdown1 := <-cc.ShutdownSubConnCh
if scToShutdown1 != sc11 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc11, scToShutdown1)
}
// Test pick return NoSubConn.
@ -862,8 +862,8 @@ func (s) TestPriority_RemovesAllPriorities(t *testing.T) {
t.Fatalf("got unexpected new picker")
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn")
case <-time.After(time.Millisecond * 100):
}
}
@ -931,10 +931,10 @@ func (s) TestPriority_HighPriorityNoEndpoints(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// p0 will remove the subconn, and ClientConn will send a sc update to
// p0 will shutdown the subconn, and ClientConn will send a sc update to
// shutdown.
scToRemove := <-cc.RemoveSubConnCh
scToRemove.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
scToShutdown := <-cc.ShutdownSubConnCh
scToShutdown.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.Shutdown})
addrs2 := <-cc.NewSubConnAddrsCh
if got, want := addrs2[0].Addr, testBackendAddrStrs[1]; got != want {
@ -1079,10 +1079,10 @@ func (s) TestPriority_MoveChildToHigherPriority(t *testing.T) {
t.Fatal(err.Error())
}
// Old subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
// Old subconn should be shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown)
}
addrs2 := <-cc.NewSubConnAddrsCh
@ -1181,9 +1181,9 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) {
}
// Old subconn from child-0 should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc0 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc0, scToRemove)
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc0 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc0, scToShutdown)
}
// Because this was a ready child moved to a higher priority, no new subconn
@ -1191,8 +1191,8 @@ func (s) TestPriority_MoveReadyChildToHigherPriority(t *testing.T) {
select {
case <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn")
case <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn")
case <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn")
case <-time.After(time.Millisecond * 100):
}
}
@ -1273,10 +1273,10 @@ func (s) TestPriority_RemoveReadyLowestChild(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Old subconn from child-1 should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
// Old subconn from child-1 should be shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown)
}
if err := cc.WaitForErrPicker(ctx); err != nil {
@ -1363,8 +1363,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) {
select {
case sc := <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn: %s", sc)
case sc := <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn: %v", sc)
case sc := <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn: %v", sc)
case <-time.After(time.Millisecond * 100):
}
@ -1395,8 +1395,8 @@ func (s) TestPriority_ReadyChildRemovedButInCache(t *testing.T) {
select {
case sc := <-cc.NewSubConnCh:
t.Fatalf("got unexpected new SubConn: %s", sc)
case sc := <-cc.RemoveSubConnCh:
t.Fatalf("got unexpected remove SubConn: %v", sc)
case sc := <-cc.ShutdownSubConnCh:
t.Fatalf("got unexpected shutdown SubConn: %v", sc)
case <-time.After(time.Millisecond * 100):
}
}
@ -1463,10 +1463,10 @@ func (s) TestPriority_ChildPolicyChange(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// Old subconn should be removed.
scToRemove := <-cc.RemoveSubConnCh
if scToRemove != sc1 {
t.Fatalf("RemoveSubConn, want %v, got %v", sc1, scToRemove)
// Old subconn should be shut down.
scToShutdown := <-cc.ShutdownSubConnCh
if scToShutdown != sc1 {
t.Fatalf("ShutdownSubConn, want %v, got %v", sc1, scToShutdown)
}
// A new subconn should be created.

View File

@ -252,7 +252,7 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
if _, ok := addrsSet.Get(addr); !ok {
v, _ := b.subConns.Get(addr)
scInfo := v.(*subConn)
b.cc.RemoveSubConn(scInfo.sc)
scInfo.sc.Shutdown()
b.subConns.Delete(addr)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
@ -354,8 +354,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
// Save error to be reported via picker.
b.connErr = state.ConnectionError
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
// When an address was removed by resolver, b called Shutdown but kept
// the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}