priority: release references to child policies which are removed (#5682)

This commit is contained in:
Easwar Swaminathan 2022-10-06 13:23:45 -07:00 committed by GitHub
parent 5fc798be17
commit c03925db8d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 164 additions and 133 deletions

View File

@ -97,7 +97,7 @@ func (sbc *subBalancerWrapper) startBalancer() {
if sbc.balancer == nil {
sbc.balancer = gracefulswitch.NewBalancer(sbc, sbc.buildOpts)
}
sbc.group.logger.Infof("Creating child policy of type %v", sbc.builder.Name())
sbc.group.logger.Infof("Creating child policy of type %q for locality %q", sbc.builder.Name(), sbc.id)
sbc.balancer.SwitchTo(sbc.builder)
if sbc.ccState != nil {
sbc.balancer.UpdateClientConnState(*sbc.ccState)
@ -298,10 +298,22 @@ func (bg *BalancerGroup) Start() {
bg.outgoingMu.Unlock()
}
// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
// AddWithClientConn adds a balancer with the given id to the group. The
// balancer is built with a balancer builder registered with balancerName. The
// given ClientConn is passed to the newly built balancer instead of the
// onepassed to balancergroup.New().
//
// TODO: Get rid of the existing Add() API and replace it with this.
func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.ClientConn) error {
bg.logger.Infof("Adding child policy of type %q for locality %q", balancerName, id)
builder := balancer.Get(balancerName)
if builder == nil {
return fmt.Errorf("unregistered balancer name %q", balancerName)
}
// Store data in static map, and then check to see if bg is started.
bg.outgoingMu.Lock()
defer bg.outgoingMu.Unlock()
var sbc *subBalancerWrapper
// If outgoingStarted is true, search in the cache. Otherwise, cache is
// guaranteed to be empty, searching is unnecessary.
@ -326,7 +338,7 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
}
if sbc == nil {
sbc = &subBalancerWrapper{
ClientConn: bg.cc,
ClientConn: cc,
id: id,
group: bg,
builder: builder,
@ -343,11 +355,18 @@ func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
sbc.updateBalancerStateWithCachedPicker()
}
bg.idToBalancerConfig[id] = sbc
bg.outgoingMu.Unlock()
return nil
}
// Add adds a balancer built by builder to the group, with given id.
func (bg *BalancerGroup) Add(id string, builder balancer.Builder) {
bg.AddWithClientConn(id, builder.Name(), bg.cc)
}
// UpdateBuilder updates the builder for a current child, starting the Graceful
// Switch process for that child.
//
// TODO: update this API to take the name of the new builder instead.
func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
bg.outgoingMu.Lock()
// This does not deal with the balancer cache because this call should come
@ -369,6 +388,7 @@ func (bg *BalancerGroup) UpdateBuilder(id string, builder balancer.Builder) {
// closed after timeout. Cleanup work (closing sub-balancer and removing
// subconns) will be done after timeout.
func (bg *BalancerGroup) Remove(id string) {
bg.logger.Infof("Removing child policy for locality %q", id)
bg.outgoingMu.Lock()
if sbToRemove, ok := bg.idToBalancerConfig[id]; ok {
if bg.outgoingStarted {

View File

@ -374,11 +374,19 @@ func (s) TestBalancerGroup_locality_caching_not_readd_within_timeout(t *testing.
}
}
// Wrap the rr builder, so it behaves the same, but has a different pointer.
// Wrap the rr builder, so it behaves the same, but has a different name.
type noopBalancerBuilderWrapper struct {
balancer.Builder
}
func init() {
balancer.Register(&noopBalancerBuilderWrapper{Builder: rrBuilder})
}
func (*noopBalancerBuilderWrapper) Name() string {
return "noopBalancerBuilderWrapper"
}
// After removing a sub-balancer, re-add with same ID, but different balancer
// builder. Old subconns should be removed, and new subconns should be created.
func (s) TestBalancerGroup_locality_caching_readd_with_different_builder(t *testing.T) {

View File

@ -331,6 +331,7 @@ func (b *clusterImplBalancer) Close() {
if b.childLB != nil {
b.childLB.Close()
b.childLB = nil
b.childState = balancer.State{}
}
<-b.done.Done()
b.logger.Infof("Shutdown")

View File

@ -194,7 +194,6 @@ func (b *clusterResolverBalancer) handleWatchUpdate(update *resourceUpdate) {
return
}
b.logger.Infof("resource update: %+v", pretty.ToJSON(update.priorities))
b.watchUpdateReceived = true
b.priorities = update.priorities

View File

@ -127,7 +127,7 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
// This is a new child, add it to the children list. But note that
// the balancer isn't built, because this child can be a low
// priority. If necessary, it will be built when syncing priorities.
cb := newChildBalancer(name, b, bb)
cb := newChildBalancer(name, b, bb.Name(), b.cc)
cb.updateConfig(newSubConfig, resolver.State{
Addresses: addressesSplit[name],
ServiceConfig: s.ResolverState.ServiceConfig,
@ -141,9 +141,9 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
// The balancing policy name is changed, close the old child. But don't
// rebuild, rebuild will happen when syncing priorities.
if currentChild.bb.Name() != bb.Name() {
if currentChild.balancerName != bb.Name() {
currentChild.stop()
currentChild.updateBuilder(bb)
currentChild.updateBalancerName(bb.Name())
}
// Update config and address, but note that this doesn't send the
@ -155,10 +155,11 @@ func (b *priorityBalancer) UpdateClientConnState(s balancer.ClientConnState) err
Attributes: s.ResolverState.Attributes,
})
}
// Remove child from children if it's not in new config.
// Cleanup resources used by children removed from the config.
for name, oldChild := range b.children {
if _, ok := newConfig.Children[name]; !ok {
oldChild.stop()
delete(b.children, name)
}
}

View File

@ -29,9 +29,11 @@ import (
)
type childBalancer struct {
name string
parent *priorityBalancer
bb *ignoreResolveNowBalancerBuilder
name string
parent *priorityBalancer
parentCC balancer.ClientConn
balancerName string
cc *ignoreResolveNowClientConn
ignoreReresolutionRequests bool
config serviceconfig.LoadBalancingConfig
@ -53,12 +55,14 @@ type childBalancer struct {
// newChildBalancer creates a child balancer place holder, but doesn't
// build/start the child balancer.
func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder) *childBalancer {
func newChildBalancer(name string, parent *priorityBalancer, balancerName string, cc balancer.ClientConn) *childBalancer {
return &childBalancer{
name: name,
parent: parent,
bb: newIgnoreResolveNowBalancerBuilder(bb, false),
started: false,
name: name,
parent: parent,
parentCC: cc,
balancerName: balancerName,
cc: newIgnoreResolveNowClientConn(cc, false),
started: false,
// Start with the connecting state and picker with re-pick error, so
// that when a priority switch causes this child picked before it's
// balancing policy is created, a re-pick will happen.
@ -69,9 +73,13 @@ func newChildBalancer(name string, parent *priorityBalancer, bb balancer.Builder
}
}
// updateBuilder updates builder for the child, but doesn't build.
func (cb *childBalancer) updateBuilder(bb balancer.Builder) {
cb.bb = newIgnoreResolveNowBalancerBuilder(bb, cb.ignoreReresolutionRequests)
// updateBalancerName updates balancer name for the child, but doesn't build a
// new one. The parent priority LB always closes the child policy before
// updating the balancer name, and the new balancer is built when it gets added
// to the balancergroup as part of start().
func (cb *childBalancer) updateBalancerName(balancerName string) {
cb.balancerName = balancerName
cb.cc = newIgnoreResolveNowClientConn(cb.parentCC, cb.ignoreReresolutionRequests)
}
// updateConfig sets childBalancer's config and state, but doesn't send update to
@ -93,14 +101,14 @@ func (cb *childBalancer) start() {
return
}
cb.started = true
cb.parent.bg.Add(cb.name, cb.bb)
cb.parent.bg.AddWithClientConn(cb.name, cb.balancerName, cb.cc)
cb.startInitTimer()
cb.sendUpdate()
}
// sendUpdate sends the addresses and config to the child balancer.
func (cb *childBalancer) sendUpdate() {
cb.bb.updateIgnoreResolveNow(cb.ignoreReresolutionRequests)
cb.cc.updateIgnoreResolveNow(cb.ignoreReresolutionRequests)
// TODO: return and aggregate the returned error in the parent.
err := cb.parent.bg.UpdateClientConnState(cb.name, balancer.ClientConnState{
ResolverState: cb.rState,

View File

@ -162,6 +162,10 @@ func (b *priorityBalancer) handleChildStateUpdate(childName string, s balancer.S
b.logger.Warningf("priority: child balancer not found for child %v", childName)
return
}
if !child.started {
b.logger.Warningf("priority: ignoring update from child %q which is not in started state: %+v", childName, s)
return
}
child.state = s
// We start/stop the init timer of this child based on the new connectivity

View File

@ -37,7 +37,10 @@ import (
"google.golang.org/grpc/resolver"
)
const defaultTestTimeout = 5 * time.Second
const (
defaultTestTimeout = 5 * time.Second
defaultTestShortTimeout = 100 * time.Millisecond
)
type s struct {
grpctest.Tester
@ -1525,11 +1528,21 @@ func (s) TestPriority_ChildPolicyUpdatePickerInline(t *testing.T) {
}
}
// When the child policy's configured to ignore reresolution requests, the
// ResolveNow() calls from this child should be all ignored.
// TestPriority_IgnoreReresolutionRequest tests the case where the priority
// policy has a single child policy. The test verifies that ResolveNow() calls
// from the child policy are ignored based on the value of the
// IgnoreReresolutionRequests field in the configuration.
func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Register a stub balancer to act the child policy of the priority policy.
// Provide an init function to the stub balancer to capture the ClientConn
// passed to the child policy.
ccCh := testutils.NewChannel()
childPolicyName := t.Name()
stub.Register(childPolicyName, stub.BalancerFuncs{
Init: func(data *stub.BalancerData) {
ccCh.Send(data.ClientConn)
},
})
cc := testutils.NewTestClientConn(t)
bb := balancer.Get(Name)
@ -1547,7 +1560,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {
Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName},
Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
IgnoreReresolutionRequests: true,
},
},
@ -1557,13 +1570,14 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// This is the balancer.ClientConn that the inner resolverNowBalancer is
// built with.
balancerCCI, err := resolveNowBalancerCCCh.Receive(ctx)
// Retrieve the ClientConn passed to the child policy.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ccCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for ClientConn from balancer builder")
t.Fatalf("timeout waiting for ClientConn from the child policy")
}
balancerCC := balancerCCI.(balancer.ClientConn)
balancerCC := val.(balancer.ClientConn)
// Since IgnoreReresolutionRequests was set to true, all ResolveNow() calls
// should be ignored.
@ -1573,7 +1587,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
select {
case <-cc.ResolveNowCh:
t.Fatalf("got unexpected ResolveNow() call")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
// Send another update to set IgnoreReresolutionRequests to false.
@ -1586,7 +1600,7 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {
Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName},
Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
IgnoreReresolutionRequests: false,
},
},
@ -1606,12 +1620,38 @@ func (s) TestPriority_IgnoreReresolutionRequest(t *testing.T) {
}
// When the child policy's configured to ignore reresolution requests, the
// ResolveNow() calls from this child should be all ignored, from the other
// children are forwarded.
type wrappedRoundRobinBalancerBuilder struct {
name string
ccCh *testutils.Channel
}
func (w *wrappedRoundRobinBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
w.ccCh.Send(cc)
rrBuilder := balancer.Get(roundrobin.Name)
return &wrappedRoundRobinBalancer{Balancer: rrBuilder.Build(cc, opts)}
}
func (w *wrappedRoundRobinBalancerBuilder) Name() string {
return w.name
}
type wrappedRoundRobinBalancer struct {
balancer.Balancer
}
// TestPriority_IgnoreReresolutionRequestTwoChildren tests the case where the
// priority policy has two child policies, one of them has the
// IgnoreReresolutionRequests field set to true while the other one has it set
// to false. The test verifies that ResolveNow() calls from the child which is
// set to ignore reresolution requests are ignored, while calls from the other
// child are processed.
func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
// Register a wrapping balancer to act the child policy of the priority
// policy. The wrapping balancer builder's Build() method pushes the
// balancer.ClientConn on a channel for this test to use.
ccCh := testutils.NewChannel()
childPolicyName := t.Name()
balancer.Register(&wrappedRoundRobinBalancerBuilder{name: childPolicyName, ccCh: ccCh})
cc := testutils.NewTestClientConn(t)
bb := balancer.Get(Name)
@ -1630,11 +1670,11 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
BalancerConfig: &LBConfig{
Children: map[string]*Child{
"child-0": {
Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName},
Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
IgnoreReresolutionRequests: true,
},
"child-1": {
Config: &internalserviceconfig.BalancerConfig{Name: resolveNowBalancerName},
Config: &internalserviceconfig.BalancerConfig{Name: childPolicyName},
},
},
Priorities: []string{"child-0", "child-1"},
@ -1643,12 +1683,14 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
t.Fatalf("failed to update ClientConn state: %v", err)
}
// This is the balancer.ClientConn from p0.
balancerCCI0, err := resolveNowBalancerCCCh.Receive(ctx)
// Retrieve the ClientConn passed to the child policy from p0.
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
val, err := ccCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for ClientConn from balancer builder 0")
t.Fatalf("timeout waiting for ClientConn from the child policy")
}
balancerCC0 := balancerCCI0.(balancer.ClientConn)
balancerCC0 := val.(balancer.ClientConn)
// Set p0 to transient failure, p1 will be started.
addrs0 := <-cc.NewSubConnAddrsCh
@ -1658,14 +1700,12 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
sc0 := <-cc.NewSubConnCh
pb.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
// This is the balancer.ClientConn from p1.
ctx1, cancel1 := context.WithTimeout(context.Background(), time.Second)
defer cancel1()
balancerCCI1, err := resolveNowBalancerCCCh.Receive(ctx1)
// Retrieve the ClientConn passed to the child policy from p1.
val, err = ccCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for ClientConn from balancer builder 1")
t.Fatalf("timeout waiting for ClientConn from the child policy")
}
balancerCC1 := balancerCCI1.(balancer.ClientConn)
balancerCC1 := val.(balancer.ClientConn)
// Since IgnoreReresolutionRequests was set to true for p0, ResolveNow()
// from p0 should all be ignored.
@ -1675,7 +1715,7 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
select {
case <-cc.ResolveNowCh:
t.Fatalf("got unexpected ResolveNow() call")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
// But IgnoreReresolutionRequests was false for p1, ResolveNow() from p1
@ -1683,7 +1723,7 @@ func (s) TestPriority_IgnoreReresolutionRequestTwoChildren(t *testing.T) {
balancerCC1.ResolveNow(resolver.ResolveNowOptions{})
select {
case <-cc.ResolveNowCh:
case <-time.After(time.Second):
case <-time.After(defaultTestShortTimeout):
t.Fatalf("timeout waiting for ResolveNow()")
}
}

View File

@ -25,46 +25,31 @@ import (
"google.golang.org/grpc/resolver"
)
type ignoreResolveNowBalancerBuilder struct {
balancer.Builder
// ignoreResolveNowClientConn wraps a balancer.ClientConn and overrides the
// ResolveNow() method to ignore those calls if the ignoreResolveNow bit is set.
type ignoreResolveNowClientConn struct {
balancer.ClientConn
ignoreResolveNow *uint32
}
// If `ignore` is true, all `ResolveNow()` from the balancer built from this
// builder will be ignored.
//
// `ignore` can be updated later by `updateIgnoreResolveNow`, and the update
// will be propagated to all the old and new balancers built with this.
func newIgnoreResolveNowBalancerBuilder(bb balancer.Builder, ignore bool) *ignoreResolveNowBalancerBuilder {
ret := &ignoreResolveNowBalancerBuilder{
Builder: bb,
func newIgnoreResolveNowClientConn(cc balancer.ClientConn, ignore bool) *ignoreResolveNowClientConn {
ret := &ignoreResolveNowClientConn{
ClientConn: cc,
ignoreResolveNow: new(uint32),
}
ret.updateIgnoreResolveNow(ignore)
return ret
}
func (irnbb *ignoreResolveNowBalancerBuilder) updateIgnoreResolveNow(b bool) {
func (i *ignoreResolveNowClientConn) updateIgnoreResolveNow(b bool) {
if b {
atomic.StoreUint32(irnbb.ignoreResolveNow, 1)
atomic.StoreUint32(i.ignoreResolveNow, 1)
return
}
atomic.StoreUint32(irnbb.ignoreResolveNow, 0)
atomic.StoreUint32(i.ignoreResolveNow, 0)
}
func (irnbb *ignoreResolveNowBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
return irnbb.Builder.Build(&ignoreResolveNowClientConn{
ClientConn: cc,
ignoreResolveNow: irnbb.ignoreResolveNow,
}, opts)
}
type ignoreResolveNowClientConn struct {
balancer.ClientConn
ignoreResolveNow *uint32
}
func (i ignoreResolveNowClientConn) ResolveNow(o resolver.ResolveNowOptions) {
if atomic.LoadUint32(i.ignoreResolveNow) != 0 {
return

View File

@ -23,81 +23,44 @@ import (
"testing"
"time"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/roundrobin"
"google.golang.org/grpc/internal/testutils"
"google.golang.org/grpc/resolver"
)
const resolveNowBalancerName = "test-resolve-now-balancer"
var resolveNowBalancerCCCh = testutils.NewChannel()
type resolveNowBalancerBuilder struct {
balancer.Builder
}
func (r *resolveNowBalancerBuilder) Build(cc balancer.ClientConn, opts balancer.BuildOptions) balancer.Balancer {
resolveNowBalancerCCCh.Send(cc)
return r.Builder.Build(cc, opts)
}
func (r *resolveNowBalancerBuilder) Name() string {
return resolveNowBalancerName
}
func init() {
balancer.Register(&resolveNowBalancerBuilder{
Builder: balancer.Get(roundrobin.Name),
})
}
func (s) TestIgnoreResolveNowBalancerBuilder(t *testing.T) {
resolveNowBB := balancer.Get(resolveNowBalancerName)
// Create a build wrapper, but will not ignore ResolveNow().
ignoreResolveNowBB := newIgnoreResolveNowBalancerBuilder(resolveNowBB, false)
func (s) TestIgnoreResolveNowClientConn(t *testing.T) {
cc := testutils.NewTestClientConn(t)
tb := ignoreResolveNowBB.Build(cc, balancer.BuildOptions{})
defer tb.Close()
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
// This is the balancer.ClientConn that the inner resolverNowBalancer is
// built with.
balancerCCI, err := resolveNowBalancerCCCh.Receive(ctx)
if err != nil {
t.Fatalf("timeout waiting for ClientConn from balancer builder")
}
balancerCC := balancerCCI.(balancer.ClientConn)
ignoreCC := newIgnoreResolveNowClientConn(cc, false)
// Call ResolveNow() on the CC, it should be forwarded.
balancerCC.ResolveNow(resolver.ResolveNowOptions{})
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
ignoreCC.ResolveNow(resolver.ResolveNowOptions{})
select {
case <-cc.ResolveNowCh:
case <-time.After(time.Second):
t.Fatalf("timeout waiting for ResolveNow()")
case <-ctx.Done():
t.Fatalf("Timeout waiting for ResolveNow()")
}
// Update ignoreResolveNow to true, call ResolveNow() on the CC, they should
// all be ignored.
ignoreResolveNowBB.updateIgnoreResolveNow(true)
ignoreCC.updateIgnoreResolveNow(true)
for i := 0; i < 5; i++ {
balancerCC.ResolveNow(resolver.ResolveNowOptions{})
ignoreCC.ResolveNow(resolver.ResolveNowOptions{})
}
select {
case <-cc.ResolveNowCh:
t.Fatalf("got unexpected ResolveNow() call")
case <-time.After(time.Millisecond * 100):
case <-time.After(defaultTestShortTimeout):
}
// Update ignoreResolveNow to false, new ResolveNow() calls should be
// forwarded.
ignoreResolveNowBB.updateIgnoreResolveNow(false)
balancerCC.ResolveNow(resolver.ResolveNowOptions{})
ignoreCC.updateIgnoreResolveNow(false)
ignoreCC.ResolveNow(resolver.ResolveNowOptions{})
select {
case <-cc.ResolveNowCh:
case <-time.After(time.Second):
case <-ctx.Done():
t.Fatalf("timeout waiting for ResolveNow()")
}
}

View File

@ -441,7 +441,9 @@ func (b *ringhashBalancer) regeneratePicker() {
b.picker = newPicker(b.ring, b.logger)
}
func (b *ringhashBalancer) Close() {}
func (b *ringhashBalancer) Close() {
b.logger.Infof("Shutdown")
}
func (b *ringhashBalancer) ExitIdle() {
// ExitIdle implementation is a no-op because connections are either