mirror of https://github.com/grpc/grpc-go.git
ringhash: don't recreate subConns when update doesn't change address information (#5431)
This commit is contained in:
parent
a6dcb714b2
commit
0d04c6f5d4
|
|
@ -43,8 +43,8 @@ type ringEntry struct {
|
|||
sc *subConn
|
||||
}
|
||||
|
||||
// newRing creates a ring from the subConns. The ring size is limited by the
|
||||
// passed in max/min.
|
||||
// newRing creates a ring from the subConns stored in the AddressMap. The ring
|
||||
// size is limited by the passed in max/min.
|
||||
//
|
||||
// ring entries will be created for each subConn, and subConn with high weight
|
||||
// (specified by the address) may have multiple entries.
|
||||
|
|
@ -64,7 +64,7 @@ type ringEntry struct {
|
|||
//
|
||||
// To pick from a ring, a binary search will be done for the given target hash,
|
||||
// and first item with hash >= given hash will be returned.
|
||||
func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize uint64) (*ring, error) {
|
||||
func newRing(subConns *resolver.AddressMap, minRingSize, maxRingSize uint64) (*ring, error) {
|
||||
// https://github.com/envoyproxy/envoy/blob/765c970f06a4c962961a0e03a467e165b276d50f/source/common/upstream/ring_hash_lb.cc#L114
|
||||
normalizedWeights, minWeight, err := normalizeWeights(subConns)
|
||||
if err != nil {
|
||||
|
|
@ -95,7 +95,7 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui
|
|||
for _, scw := range normalizedWeights {
|
||||
targetIdx += scale * scw.weight
|
||||
for float64(idx) < targetIdx {
|
||||
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(len(items)))
|
||||
h := xxhash.Sum64String(scw.sc.addr + strconv.Itoa(idx))
|
||||
items = append(items, &ringEntry{idx: idx, hash: h, sc: scw.sc})
|
||||
idx++
|
||||
}
|
||||
|
|
@ -111,26 +111,26 @@ func newRing(subConns map[resolver.Address]*subConn, minRingSize, maxRingSize ui
|
|||
|
||||
// normalizeWeights divides all the weights by the sum, so that the total weight
|
||||
// is 1.
|
||||
func normalizeWeights(subConns map[resolver.Address]*subConn) (_ []subConnWithWeight, min float64, _ error) {
|
||||
if len(subConns) == 0 {
|
||||
func normalizeWeights(subConns *resolver.AddressMap) ([]subConnWithWeight, float64, error) {
|
||||
keys := subConns.Keys()
|
||||
if len(keys) == 0 {
|
||||
return nil, 0, fmt.Errorf("number of subconns is 0")
|
||||
}
|
||||
var weightSum uint32
|
||||
for a := range subConns {
|
||||
// The address weight was moved from attributes to the Metadata field.
|
||||
// This is necessary (all the attributes need to be stripped) for the
|
||||
// balancer to detect identical {address+weight} combination.
|
||||
weightSum += a.Metadata.(uint32)
|
||||
for _, a := range keys {
|
||||
weightSum += getWeightAttribute(a)
|
||||
}
|
||||
if weightSum == 0 {
|
||||
return nil, 0, fmt.Errorf("total weight of all subconns is 0")
|
||||
}
|
||||
weightSumF := float64(weightSum)
|
||||
ret := make([]subConnWithWeight, 0, len(subConns))
|
||||
min = math.MaxFloat64
|
||||
for a, sc := range subConns {
|
||||
nw := float64(a.Metadata.(uint32)) / weightSumF
|
||||
ret = append(ret, subConnWithWeight{sc: sc, weight: nw})
|
||||
ret := make([]subConnWithWeight, 0, len(keys))
|
||||
min := float64(1.0)
|
||||
for _, a := range keys {
|
||||
v, _ := subConns.Get(a)
|
||||
scInfo := v.(*subConn)
|
||||
nw := float64(getWeightAttribute(a)) / weightSumF
|
||||
ret = append(ret, subConnWithWeight{sc: scInfo, weight: nw})
|
||||
if nw < min {
|
||||
min = nw
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,25 +24,31 @@ import (
|
|||
"testing"
|
||||
|
||||
xxhash "github.com/cespare/xxhash/v2"
|
||||
"google.golang.org/grpc/balancer/weightedroundrobin"
|
||||
"google.golang.org/grpc/resolver"
|
||||
)
|
||||
|
||||
func testAddr(addr string, weight uint32) resolver.Address {
|
||||
return resolver.Address{Addr: addr, Metadata: weight}
|
||||
}
|
||||
var testAddrs []resolver.Address
|
||||
var testSubConnMap *resolver.AddressMap
|
||||
|
||||
func (s) TestRingNew(t *testing.T) {
|
||||
testAddrs := []resolver.Address{
|
||||
func init() {
|
||||
testAddrs = []resolver.Address{
|
||||
testAddr("a", 3),
|
||||
testAddr("b", 3),
|
||||
testAddr("c", 4),
|
||||
}
|
||||
testSubConnMap = resolver.NewAddressMap()
|
||||
testSubConnMap.Set(testAddrs[0], &subConn{addr: "a"})
|
||||
testSubConnMap.Set(testAddrs[1], &subConn{addr: "b"})
|
||||
testSubConnMap.Set(testAddrs[2], &subConn{addr: "c"})
|
||||
}
|
||||
|
||||
func testAddr(addr string, weight uint32) resolver.Address {
|
||||
return weightedroundrobin.SetAddrInfo(resolver.Address{Addr: addr}, weightedroundrobin.AddrInfo{Weight: weight})
|
||||
}
|
||||
|
||||
func (s) TestRingNew(t *testing.T) {
|
||||
var totalWeight float64 = 10
|
||||
testSubConnMap := map[resolver.Address]*subConn{
|
||||
testAddr("a", 3): {addr: "a"},
|
||||
testAddr("b", 3): {addr: "b"},
|
||||
testAddr("c", 4): {addr: "c"},
|
||||
}
|
||||
for _, min := range []uint64{3, 4, 6, 8} {
|
||||
for _, max := range []uint64{20, 8} {
|
||||
t.Run(fmt.Sprintf("size-min-%v-max-%v", min, max), func(t *testing.T) {
|
||||
|
|
@ -59,7 +65,7 @@ func (s) TestRingNew(t *testing.T) {
|
|||
}
|
||||
}
|
||||
got := float64(count) / float64(totalCount)
|
||||
want := float64(a.Metadata.(uint32)) / totalWeight
|
||||
want := float64(getWeightAttribute(a)) / totalWeight
|
||||
if !equalApproximately(got, want) {
|
||||
t.Fatalf("unexpected item weight in ring: %v != %v", got, want)
|
||||
}
|
||||
|
|
@ -76,11 +82,7 @@ func equalApproximately(x, y float64) bool {
|
|||
}
|
||||
|
||||
func (s) TestRingPick(t *testing.T) {
|
||||
r, _ := newRing(map[resolver.Address]*subConn{
|
||||
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
|
||||
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
|
||||
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
|
||||
}, 10, 20)
|
||||
r, _ := newRing(testSubConnMap, 10, 20)
|
||||
for _, h := range []uint64{xxhash.Sum64String("1"), xxhash.Sum64String("2"), xxhash.Sum64String("3"), xxhash.Sum64String("4")} {
|
||||
t.Run(fmt.Sprintf("picking-hash-%v", h), func(t *testing.T) {
|
||||
e := r.pick(h)
|
||||
|
|
@ -98,11 +100,7 @@ func (s) TestRingPick(t *testing.T) {
|
|||
}
|
||||
|
||||
func (s) TestRingNext(t *testing.T) {
|
||||
r, _ := newRing(map[resolver.Address]*subConn{
|
||||
{Addr: "a", Metadata: uint32(3)}: {addr: "a"},
|
||||
{Addr: "b", Metadata: uint32(3)}: {addr: "b"},
|
||||
{Addr: "c", Metadata: uint32(4)}: {addr: "c"},
|
||||
}, 10, 20)
|
||||
r, _ := newRing(testSubConnMap, 10, 20)
|
||||
|
||||
for _, e := range r.items {
|
||||
ne := r.next(e)
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ type bb struct{}
|
|||
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
|
||||
b := &ringhashBalancer{
|
||||
cc: cc,
|
||||
subConns: make(map[resolver.Address]*subConn),
|
||||
subConns: resolver.NewAddressMap(),
|
||||
scStates: make(map[balancer.SubConn]*subConn),
|
||||
csEvltr: &connectivityStateEvaluator{},
|
||||
}
|
||||
|
|
@ -65,8 +65,9 @@ func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, err
|
|||
}
|
||||
|
||||
type subConn struct {
|
||||
addr string
|
||||
sc balancer.SubConn
|
||||
addr string
|
||||
weight uint32
|
||||
sc balancer.SubConn
|
||||
|
||||
mu sync.RWMutex
|
||||
// This is the actual state of this SubConn (as updated by the ClientConn).
|
||||
|
|
@ -178,9 +179,8 @@ type ringhashBalancer struct {
|
|||
cc balancer.ClientConn
|
||||
logger *grpclog.PrefixLogger
|
||||
|
||||
config *LBConfig
|
||||
|
||||
subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses)
|
||||
config *LBConfig
|
||||
subConns *resolver.AddressMap // Map from resolver.Address to `*subConn`.
|
||||
scStates map[balancer.SubConn]*subConn
|
||||
|
||||
// ring is always in sync with subConns. When subConns change, a new ring is
|
||||
|
|
@ -208,55 +208,47 @@ type ringhashBalancer struct {
|
|||
// SubConn states are Idle.
|
||||
func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
|
||||
var addrsUpdated bool
|
||||
// addrsSet is the set converted from addrs, it's used for quick lookup of
|
||||
// an address.
|
||||
//
|
||||
// Addresses in this map all have attributes stripped, but metadata set to
|
||||
// the weight. So that weight change can be detected.
|
||||
//
|
||||
// TODO: this won't be necessary if there are ways to compare address
|
||||
// attributes.
|
||||
addrsSet := make(map[resolver.Address]struct{})
|
||||
for _, a := range addrs {
|
||||
aNoAttrs := a
|
||||
// Strip attributes but set Metadata to the weight.
|
||||
aNoAttrs.Attributes = nil
|
||||
w := weightedroundrobin.GetAddrInfo(a).Weight
|
||||
if w == 0 {
|
||||
// If weight is not set, use 1.
|
||||
w = 1
|
||||
}
|
||||
aNoAttrs.Metadata = w
|
||||
addrsSet[aNoAttrs] = struct{}{}
|
||||
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
|
||||
// When creating SubConn, the original address with attributes is
|
||||
// passed through. So that connection configurations in attributes
|
||||
// (like creds) will be used.
|
||||
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
|
||||
// addrsSet is the set converted from addrs, used for quick lookup.
|
||||
addrsSet := resolver.NewAddressMap()
|
||||
for _, addr := range addrs {
|
||||
addrsSet.Set(addr, true)
|
||||
newWeight := getWeightAttribute(addr)
|
||||
if val, ok := b.subConns.Get(addr); !ok {
|
||||
sc, err := b.cc.NewSubConn([]resolver.Address{addr}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
|
||||
if err != nil {
|
||||
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
|
||||
continue
|
||||
}
|
||||
scs := &subConn{addr: a.Addr, sc: sc}
|
||||
scs := &subConn{addr: addr.Addr, weight: newWeight, sc: sc}
|
||||
scs.setState(connectivity.Idle)
|
||||
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
|
||||
b.subConns[aNoAttrs] = scs
|
||||
b.subConns.Set(addr, scs)
|
||||
b.scStates[sc] = scs
|
||||
addrsUpdated = true
|
||||
} else {
|
||||
// Always update the subconn's address in case the attributes
|
||||
// changed. The SubConn does a reflect.DeepEqual of the new and old
|
||||
// addresses. So this is a noop if the current address is the same
|
||||
// as the old one (including attributes).
|
||||
b.subConns[aNoAttrs] = scInfo
|
||||
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a})
|
||||
// We have seen this address before and created a subConn for it. If the
|
||||
// weight associated with the address has changed, update the subConns map
|
||||
// with the new weight. This will be used when a new ring is created.
|
||||
//
|
||||
// There is no need to call UpdateAddresses on the subConn at this point
|
||||
// since *only* the weight attribute has changed, and that does not affect
|
||||
// subConn uniqueness.
|
||||
scInfo := val.(*subConn)
|
||||
if oldWeight := scInfo.weight; oldWeight != newWeight {
|
||||
scInfo.weight = newWeight
|
||||
b.subConns.Set(addr, scInfo)
|
||||
// Return true to force recreation of the ring.
|
||||
addrsUpdated = true
|
||||
}
|
||||
}
|
||||
}
|
||||
for a, scInfo := range b.subConns {
|
||||
// a was removed by resolver.
|
||||
if _, ok := addrsSet[a]; !ok {
|
||||
for _, addr := range b.subConns.Keys() {
|
||||
// addr was removed by resolver.
|
||||
if _, ok := addrsSet.Get(addr); !ok {
|
||||
v, _ := b.subConns.Get(addr)
|
||||
scInfo := v.(*subConn)
|
||||
b.cc.RemoveSubConn(scInfo.sc)
|
||||
delete(b.subConns, a)
|
||||
b.subConns.Delete(addr)
|
||||
addrsUpdated = true
|
||||
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
|
||||
// The entry will be deleted in UpdateSubConnState.
|
||||
|
|
@ -304,7 +296,7 @@ func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) err
|
|||
|
||||
func (b *ringhashBalancer) ResolverError(err error) {
|
||||
b.resolverErr = err
|
||||
if len(b.subConns) == 0 {
|
||||
if b.subConns.Len() == 0 {
|
||||
b.state = connectivity.TransientFailure
|
||||
}
|
||||
|
||||
|
|
@ -392,7 +384,8 @@ func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balance
|
|||
// attempting to connect, we need to trigger one. But since the deleted
|
||||
// SubConn will eventually send a shutdown update, this code will run
|
||||
// and trigger the next SubConn to connect.
|
||||
for _, sc := range b.subConns {
|
||||
for _, v := range b.subConns.Values() {
|
||||
sc := v.(*subConn)
|
||||
if sc.isAttemptingToConnect() {
|
||||
return
|
||||
}
|
||||
|
|
@ -485,3 +478,18 @@ func (cse *connectivityStateEvaluator) recordTransition(oldState, newState conne
|
|||
}
|
||||
return connectivity.TransientFailure
|
||||
}
|
||||
|
||||
// getWeightAttribute is a convenience function which returns the value of the
|
||||
// weight attribute stored in the BalancerAttributes field of addr, using the
|
||||
// weightedroundrobin package.
|
||||
//
|
||||
// When used in the xDS context, the weight attribute is guaranteed to be
|
||||
// non-zero. But, when used in a non-xDS context, the weight attribute could be
|
||||
// unset. A Default of 1 is used in the latter case.
|
||||
func getWeightAttribute(addr resolver.Address) uint32 {
|
||||
w := weightedroundrobin.GetAddrInfo(addr).Weight
|
||||
if w == 0 {
|
||||
return 1
|
||||
}
|
||||
return w
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,6 +33,7 @@ import (
|
|||
"google.golang.org/grpc/internal/grpctest"
|
||||
"google.golang.org/grpc/internal/testutils"
|
||||
"google.golang.org/grpc/resolver"
|
||||
"google.golang.org/grpc/xds/internal"
|
||||
)
|
||||
|
||||
var (
|
||||
|
|
@ -491,3 +492,26 @@ func (s) TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
|
|||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestAddrBalancerAttributesChange tests the case where the ringhash balancer
|
||||
// receives a ClientConnUpdate with the same config and addresses as received in
|
||||
// the previous update. Although the `BalancerAttributes` contents are the same,
|
||||
// the pointer is different. This test verifies that subConns are not recreated
|
||||
// in this scenario.
|
||||
func (s) TestAddrBalancerAttributesChange(t *testing.T) {
|
||||
addrs1 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
|
||||
cc, b, _ := setupTest(t, addrs1)
|
||||
|
||||
addrs2 := []resolver.Address{internal.SetLocalityID(resolver.Address{Addr: testBackendAddrStrs[0]}, internal.LocalityID{Region: "americas"})}
|
||||
if err := b.UpdateClientConnState(balancer.ClientConnState{
|
||||
ResolverState: resolver.State{Addresses: addrs2},
|
||||
BalancerConfig: nil,
|
||||
}); err != nil {
|
||||
t.Fatalf("UpdateClientConnState returned err: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-cc.NewSubConnCh:
|
||||
t.Fatal("new subConn created for an update with the same addresses")
|
||||
case <-time.After(defaultTestShortTimeout):
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in New Issue