balancergroup: improve observability around balancer cache behavior (#6597)

This commit is contained in:
Easwar Swaminathan 2023-08-31 11:27:03 -07:00 committed by GitHub
parent aa6ce35c79
commit 778e638122
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 47 additions and 1 deletions

View File

@ -328,6 +328,11 @@ func (bg *BalancerGroup) AddWithClientConn(id, balancerName string, cc balancer.
// caching is disabled. // caching is disabled.
if bg.outgoingStarted && bg.deletedBalancerCache != nil { if bg.outgoingStarted && bg.deletedBalancerCache != nil {
if old, ok := bg.deletedBalancerCache.Remove(id); ok { if old, ok := bg.deletedBalancerCache.Remove(id); ok {
if bg.logger.V(2) {
bg.logger.Infof("Removing and reusing child policy of type %q for locality %q from the balancer cache", balancerName, id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}
sbc, _ = old.(*subBalancerWrapper) sbc, _ = old.(*subBalancerWrapper)
if sbc != nil && sbc.builder != builder { if sbc != nil && sbc.builder != builder {
// If the sub-balancer in cache was built with a different // If the sub-balancer in cache was built with a different
@ -403,7 +408,7 @@ func (bg *BalancerGroup) Remove(id string) {
sbToRemove, ok := bg.idToBalancerConfig[id] sbToRemove, ok := bg.idToBalancerConfig[id]
if !ok { if !ok {
bg.logger.Infof("balancer group: trying to remove a non-existing locality from balancer group: %v", id) bg.logger.Errorf("Child policy for locality %q does not exist in the balancer group", id)
bg.outgoingMu.Unlock() bg.outgoingMu.Unlock()
return return
} }
@ -418,7 +423,17 @@ func (bg *BalancerGroup) Remove(id string) {
} }
if bg.deletedBalancerCache != nil { if bg.deletedBalancerCache != nil {
if bg.logger.V(2) {
bg.logger.Infof("Adding child policy for locality %q to the balancer cache", id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}
bg.deletedBalancerCache.Add(id, sbToRemove, func() { bg.deletedBalancerCache.Add(id, sbToRemove, func() {
if bg.logger.V(2) {
bg.logger.Infof("Removing child policy for locality %q from the balancer cache after timeout", id)
bg.logger.Infof("Number of items remaining in the balancer cache: %d", bg.deletedBalancerCache.Len())
}
// A sub-balancer evicted from the timeout cache needs to closed // A sub-balancer evicted from the timeout cache needs to closed
// and its subConns need to removed, unconditionally. There is a // and its subConns need to removed, unconditionally. There is a
// possibility that a sub-balancer might be removed (thereby // possibility that a sub-balancer might be removed (thereby

View File

@ -142,3 +142,10 @@ func (c *TimeoutCache) Clear(runCallback bool) {
entry.callback() entry.callback()
} }
} }
// Len returns the number of entries in the cache.
func (c *TimeoutCache) Len() int {
c.mu.Lock()
defer c.mu.Unlock()
return len(c.cache)
}

View File

@ -58,6 +58,9 @@ func (s) TestCacheExpire(t *testing.T) {
if gotV, ok := c.getForTesting(k); !ok || gotV.item != v { if gotV, ok := c.getForTesting(k); !ok || gotV.item != v {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", gotV.item, ok, v, true) t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", gotV.item, ok, v, true)
} }
if l := c.Len(); l != 1 {
t.Fatalf("%d number of items in the cache, want 1", l)
}
select { select {
case <-callbackChan: case <-callbackChan:
@ -68,6 +71,9 @@ func (s) TestCacheExpire(t *testing.T) {
if _, ok := c.getForTesting(k); ok { if _, ok := c.getForTesting(k); ok {
t.Fatalf("After Add(), after timeout, from cache got: _, %v, want _, %v", ok, false) t.Fatalf("After Add(), after timeout, from cache got: _, %v, want _, %v", ok, false)
} }
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}
} }
// TestCacheRemove attempts to remove an existing entry from the cache and // TestCacheRemove attempts to remove an existing entry from the cache and
@ -83,6 +89,9 @@ func (s) TestCacheRemove(t *testing.T) {
if got, ok := c.getForTesting(k); !ok || got.item != v { if got, ok := c.getForTesting(k); !ok || got.item != v {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true) t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
} }
if l := c.Len(); l != 1 {
t.Fatalf("%d number of items in the cache, want 1", l)
}
time.Sleep(testCacheTimeout / 2) time.Sleep(testCacheTimeout / 2)
@ -94,6 +103,9 @@ func (s) TestCacheRemove(t *testing.T) {
if _, ok := c.getForTesting(k); ok { if _, ok := c.getForTesting(k); ok {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false) t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
} }
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}
select { select {
case <-callbackChan: case <-callbackChan:
@ -133,6 +145,9 @@ func (s) TestCacheClearWithoutCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true) t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
} }
} }
if l := c.Len(); l != itemCount {
t.Fatalf("%d number of items in the cache, want %d", l, itemCount)
}
time.Sleep(testCacheTimeout / 2) time.Sleep(testCacheTimeout / 2)
c.Clear(false) c.Clear(false)
@ -142,6 +157,9 @@ func (s) TestCacheClearWithoutCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false) t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
} }
} }
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}
select { select {
case <-callbackChan: case <-callbackChan:
@ -188,6 +206,9 @@ func (s) TestCacheClearWithCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true) t.Fatalf("After Add(), before timeout, from cache got: %v, %v, want %v, %v", got.item, ok, v, true)
} }
} }
if l := c.Len(); l != itemCount {
t.Fatalf("%d number of items in the cache, want %d", l, itemCount)
}
time.Sleep(testCacheTimeout / 2) time.Sleep(testCacheTimeout / 2)
c.Clear(true) c.Clear(true)
@ -197,6 +218,9 @@ func (s) TestCacheClearWithCallback(t *testing.T) {
t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false) t.Fatalf("After Add(), before timeout, after Remove(), from cache got: _, %v, want _, %v", ok, false)
} }
} }
if l := c.Len(); l != 0 {
t.Fatalf("%d number of items in the cache, want 0", l)
}
select { select {
case <-allGoroutineDone: case <-allGoroutineDone: