ringhash: handle config updates properly (#5557)

This commit is contained in:
Easwar Swaminathan 2022-08-04 12:11:45 -07:00 committed by GitHub
parent 946dde008f
commit f9409d385f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 81 additions and 45 deletions

View File

@ -19,7 +19,6 @@
package ringhash
import (
"fmt"
"math"
"sort"
"strconv"
@ -64,12 +63,12 @@ type ringEntry struct {
//
// To pick from a ring, a binary search will be done for the given target hash,
// and first item with hash >= given hash will be returned.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
//
// Must be called with a non-empty subConns map.
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) *ring {
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
normalizedWeights, minWeight, err := normalizeWeights(subConns)
if err != nil {
return nil, err
}
normalizedWeights, minWeight := normalizeWeights(subConns)
// Normalized weights for {3,3,4} is {0.3,0.3,0.4}.
// Scale up the size of the ring such that the least-weighted host gets a
@ -106,30 +105,29 @@ func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*r
for i, ii := range items {
ii.idx = i
}
return &ring{items: items}, nil
return &ring{items: items}
}
// normalizeWeights divides all the weights by the sum, so that the total weight
// is 1.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
keys := subConns.Keys()
if len(keys) == 0 {
return nil, 0, fmt.Errorf("number of subconns is 0")
}
//
// Must be called with a non-empty subConns map.
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64) {
var weightSum uint32
keys := subConns.Keys()
for _, a := range keys {
weightSum += getWeightAttribute(a)
}
if weightSum == 0 {
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
}
weightSumF := float64(weightSum)
ret := make([]subConnWithWeight, 0, len(keys))
min := float64(1.0)
for _, a := range keys {
v, _ := subConns.Get(a)
scInfo := v.(*subConn)
nw := float64(getWeightAttribute(a)) / weightSumF
// getWeightAttribute() returns 1 if the weight attribute is not found
// on the address. And since this function is guaranteed to be called
// with a non-empty subConns map, weightSum is guaranteed to be
// non-zero. So, we need not worry about divide a by zero error here.
nw := float64(getWeightAttribute(a)) / float64(weightSum)
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
if nw < min {
min = nw
@ -142,7 +140,7 @@ func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float
// where an address is added and then removed, the RPCs will still pick the
// same old SubConn.
sort.Slice(ret, func(i, j int) bool { return ret[i].sc.addr < ret[j].sc.addr })
return ret, min, nil
return ret, min
}
// pick does a binary search. It returns the item with smallest index i that

View File

@ -52,7 +52,7 @@ func (s) TestRingNew(t *testing.T) {
for _, min := range []uint64{3, 4, 6, 8} {
for _, max := range []uint64{20, 8} {
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
r, _ := newRing(testSubConnMap, min, max)
r := newRing(testSubConnMap, min, max)
totalCount := len(r.items)
if totalCount < int(min) || totalCount > int(max) {
t.Fatalf("unexpect size %v, want min %v, max %v", totalCount, min, max)
@ -82,7 +82,7 @@ func equalApproximately(x, y float64) bool {
}
func (s) TestRingPick(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
e := r.pick(h)
@ -100,7 +100,7 @@ func (s) TestRingPick(t *testing.T) {
}
func (s) TestRingNext(t *testing.T) {
r, _ := newRing(testSubConnMap, 10, 20)
r := newRing(testSubConnMap, 10, 20)
for _, e := range r.items {
ne := r.next(e)

View File

@ -259,29 +259,22 @@ func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
if b.config == nil {
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
b.config = newConfig
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
if b.updateAddresses(s.ResolverState.Addresses) {
// If addresses were updated, no matter whether it resulted in SubConn
// creation/deletion, or just weight update, we will need to regenerate
// the ring.
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
b.ResolverError(fmt.Errorf("ringhash failed to make a new ring: %v", err))
return balancer.ErrBadResolverState
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
// If addresses were updated, whether it resulted in SubConn
// creation/deletion, or just weight update, we need to regenerate the ring
// and send a new picker.
regenerateRing := b.updateAddresses(s.ResolverState.Addresses)
// If the ring configuration has changed, we need to regenerate the ring and
// send a new picker.
if b.config == nil || b.config.MinRingSize != newConfig.MinRingSize || b.config.MaxRingSize != newConfig.MaxRingSize {
regenerateRing = true
}
b.config = newConfig
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
@ -291,6 +284,17 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
if regenerateRing {
// Ring creation is guaranteed to not fail because we call newRing()
// with a non-empty subConns map.
b.ring = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
// Successful resolution; clear resolver error and return nil.
b.resolverErr = nil
return nil
}

View File

@ -108,6 +108,40 @@ func Test(t *testing.T) {
grpctest.RunSubTests(t, s{})
}
// TestUpdateClientConnState_NewRingSize tests the scenario where the ringhash
// LB policy receives new configuration which specifies new values for the ring
// min and max sizes. The test verifies that a new ring is created and a new
// picker is sent to the ClientConn.
func (s) TestUpdateClientConnState_NewRingSize(t *testing.T) {
origMinRingSize, origMaxRingSize := 1, 10 // Configured from `testConfig` in `setupTest`
newMinRingSize, newMaxRingSize := 20, 100
addrs := []resolver.Address{{Addr: testBackendAddrStrs[0]}}
cc, b, p1 := setupTest(t, addrs)
ring1 := p1.(*picker).ring
if ringSize := len(ring1.items); ringSize < origMinRingSize || ringSize > origMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, origMinRingSize, origMaxRingSize)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: &LBConfig{MinRingSize: uint64(newMinRingSize), MaxRingSize: uint64(newMaxRingSize)},
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
var ring2 *ring
select {
case <-time.After(defaultTestTimeout):
t.Fatal("Timeout when waiting for a picker update after a configuration update")
case p2 := <-cc.NewPickerCh:
ring2 = p2.(*picker).ring
}
if ringSize := len(ring2.items); ringSize < newMinRingSize || ringSize > newMaxRingSize {
t.Fatalf("Ring created with size %d, want between [%d, %d]", ringSize, newMinRingSize, newMaxRingSize)
}
}
func (s) TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
@ -320,7 +354,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
@ -336,7 +370,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
@ -359,7 +393,7 @@ func (s) TestAddrWeightChange(t *testing.T) {
resolver.Address{Addr: testBackendAddrStrs[1]},
weightedroundrobin.AddrInfo{Weight: 2}),
}},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
@ -505,7 +539,7 @@ func (s) TestAddrBalancerAttributesChange(t *testing.T) {
addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs2},
BalancerConfig: nil,
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}