ringhash: the balancer (#4741)

This commit is contained in:
Menghan Li 2021-09-15 14:47:18 -07:00 committed by GitHub
parent 4c5f7fb0ee
commit 4f093b9a5a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 938 additions and 32 deletions

View File

@ -0,0 +1,34 @@
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package ringhash
import (
"fmt"
"google.golang.org/grpc/grpclog"
internalgrpclog "google.golang.org/grpc/internal/grpclog"
)
const prefix = "[ring-hash-lb %p] "
var logger = grpclog.Component("xds")
func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger {
return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p))
}

View File

@ -24,11 +24,17 @@ import (
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/status"
)
type picker struct {
ring *ring
ring *ring
logger *grpclog.PrefixLogger
}
func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker {
return &picker{ring: ring, logger: logger}
}
// handleRICSResult is the return type of handleRICS. It's needed to wrap the
@ -47,7 +53,7 @@ type handleRICSResult struct {
// The first return value indicates if the state is in Ready, Idle, Connecting
// or Shutdown. If it's true, the PickResult and error should be returned from
// Pick() as is.
func handleRICS(e *ringEntry) (handleRICSResult, bool) {
func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) {
switch state := e.sc.effectiveState(); state {
case connectivity.Ready:
return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true
@ -67,15 +73,14 @@ func handleRICS(e *ringEntry) (handleRICSResult, bool) {
default:
// Should never reach this. All the connectivity states are already
// handled in the cases.
//
// FIXME: add an error log.
p.logger.Errorf("SubConn has undefined connectivity state: %v", state)
return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true
}
}
func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) {
e := p.ring.pick(getRequestHash(info.Ctx))
if hr, ok := handleRICS(e); ok {
if hr, ok := p.handleRICS(e); ok {
return hr.pr, hr.err
}
// ok was false, the entry is in transient failure.
@ -95,7 +100,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro
// For the second SubConn, also check Ready/Idle/Connecting as if it's the
// first entry.
if hr, ok := handleRICS(e2); ok {
if hr, ok := p.handleRICS(e2); ok {
return hr.pr, hr.err
}

View File

@ -20,15 +20,50 @@
package ringhash
import (
"encoding/json"
"errors"
"fmt"
"sync"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/internal/grpclog"
"google.golang.org/grpc/internal/pretty"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/serviceconfig"
)
// Name is the name of the ring_hash balancer.
const Name = "ring_hash_experimental"
func init() {
balancer.Register(bb{})
}
type bb struct{}
func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer {
b := &ringhashBalancer{
cc: cc,
subConns: make(map[resolver.Address]*subConn),
scStates: make(map[balancer.SubConn]*subConn),
csEvltr: &connectivityStateEvaluator{},
}
b.logger = prefixLogger(b)
b.logger.Infof("Created")
return b
}
func (bb) Name() string {
return Name
}
func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) {
return parseConfig(c)
}
type subConn struct {
addr string
sc balancer.SubConn
@ -37,26 +72,25 @@ type subConn struct {
// This is the actual state of this SubConn (as updated by the ClientConn).
// The effective state can be different, see comment of attemptedToConnect.
state connectivity.State
// attemptedToConnect is whether this SubConn has attempted to connect ever.
// So that only the initial Idle is Idle, after any attempt to connect,
// following Idles are all TransientFailure.
// failing is whether this SubConn is in a failing state. A subConn is
// considered to be in a failing state if it was previously in
// TransientFailure.
//
// This affects the effective connectivity state of this SubConn, e.g. if
// the actual state is Idle, but this SubConn has attempted to connect, the
// effective state is TransientFailure.
// This affects the effective connectivity state of this SubConn, e.g.
// - if the actual state is Idle or Connecting, but this SubConn is failing,
// the effective state is TransientFailure.
//
// This is used in pick(). E.g. if a subConn is Idle, but has
// attemptedToConnect as true, pick() will
// This is used in pick(). E.g. if a subConn is Idle, but has failing as
// true, pick() will
// - consider this SubConn as TransientFailure, and check the state of the
// next SubConn.
// - trigger Connect() (note that normally a SubConn in real
// TransientFailure cannot Connect())
//
// Note this should only be set when updating the state (from Idle to
// anything else), not when Connect() is called, because there's a small
// window after the first Connect(), before the state switches to something
// else.
attemptedToConnect bool
// A subConn starts in non-failing (failing is false). A transition to
// TransientFailure sets failing to true (and it stays true). A transition
// to Ready sets failing to false.
failing bool
// connectQueued is true if a Connect() was queued for this SubConn while
// it's not in Idle (most likely was in TransientFailure). A Connect() will
// be triggered on this SubConn when it turns Idle.
@ -66,20 +100,13 @@ type subConn struct {
connectQueued bool
}
// SetState updates the state of this SubConn.
// setState updates the state of this SubConn.
//
// It also handles the queued Connect(). If the new state is Idle, and a
// Connect() was queued, this SubConn will be triggered to Connect().
//
// FIXME: unexport this. It's exported so that staticcheck doesn't complain
// about unused functions.
func (sc *subConn) SetState(s connectivity.State) {
func (sc *subConn) setState(s connectivity.State) {
sc.mu.Lock()
defer sc.mu.Unlock()
// Any state change to non-Idle means there was an attempt to connect.
if s != connectivity.Idle {
sc.attemptedToConnect = true
}
switch s {
case connectivity.Idle:
// Trigger Connect() if new state is Idle, and there is a queued connect.
@ -87,21 +114,30 @@ func (sc *subConn) SetState(s connectivity.State) {
sc.connectQueued = false
sc.sc.Connect()
}
case connectivity.Connecting, connectivity.Ready:
case connectivity.Connecting:
// Clear connectQueued if the SubConn isn't failing. This state
// transition is unlikely to happen, but handle this just in case.
sc.connectQueued = false
case connectivity.Ready:
// Clear connectQueued if the SubConn isn't failing. This state
// transition is unlikely to happen, but handle this just in case.
sc.connectQueued = false
// Set to a non-failing state.
sc.failing = false
case connectivity.TransientFailure:
// Set to a failing state.
sc.failing = true
}
sc.state = s
}
// effectiveState returns the effective state of this SubConn. It can be
// different from the actual state, e.g. Idle after any attempt to connect (any
// Idle other than the initial Idle) is considered TransientFailure.
// different from the actual state, e.g. Idle while the subConn is failing is
// considered TransientFailure. Read comment of field failing for other cases.
func (sc *subConn) effectiveState() connectivity.State {
sc.mu.RLock()
defer sc.mu.RUnlock()
if sc.state == connectivity.Idle && sc.attemptedToConnect {
if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) {
return connectivity.TransientFailure
}
return sc.state
@ -121,3 +157,278 @@ func (sc *subConn) queueConnect() {
// after backoff in TransientFailure), it will Connect().
sc.connectQueued = true
}
type ringhashBalancer struct {
cc balancer.ClientConn
logger *grpclog.PrefixLogger
config *LBConfig
subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses)
scStates map[balancer.SubConn]*subConn
// ring is always in sync with subConns. When subConns change, a new ring is
// generated. Note that address weights updates (they are keys in the
// subConns map) also regenerates the ring.
ring *ring
picker balancer.Picker
csEvltr *connectivityStateEvaluator
state connectivity.State
resolverErr error // the last error reported by the resolver; cleared on successful resolution
connErr error // the last connection error; cleared upon leaving TransientFailure
}
// updateAddresses creates new SubConns and removes SubConns, based on the
// address update.
//
// The return value is whether the new address list is different from the
// previous. True if
// - an address was added
// - an address was removed
// - an address's weight was updated
//
// Note that this function doesn't trigger SubConn connecting, so all the new
// SubConn states are Idle.
func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool {
var addrsUpdated bool
// addrsSet is the set converted from addrs, it's used for quick lookup of
// an address.
//
// Addresses in this map all have attributes stripped, but metadata set to
// the weight. So that weight change can be detected.
//
// TODO: this won't be necessary if there are ways to compare address
// attributes.
addrsSet := make(map[resolver.Address]struct{})
for _, a := range addrs {
aNoAttrs := a
// Strip attributes but set Metadata to the weight.
aNoAttrs.Attributes = nil
w := weightedroundrobin.GetAddrInfo(a).Weight
if w == 0 {
// If weight is not set, use 1.
w = 1
}
aNoAttrs.Metadata = w
addrsSet[aNoAttrs] = struct{}{}
if scInfo, ok := b.subConns[aNoAttrs]; !ok {
// When creating SubConn, the original address with attributes is
// passed through. So that connection configurations in attributes
// (like creds) will be used.
sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true})
if err != nil {
logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err)
continue
}
scs := &subConn{addr: a.Addr, sc: sc}
scs.setState(connectivity.Idle)
b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle)
b.subConns[aNoAttrs] = scs
b.scStates[sc] = scs
addrsUpdated = true
} else {
// Always update the subconn's address in case the attributes
// changed. The SubConn does a reflect.DeepEqual of the new and old
// addresses. So this is a noop if the current address is the same
// as the old one (including attributes).
b.subConns[aNoAttrs] = scInfo
b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a})
}
}
for a, scInfo := range b.subConns {
// a was removed by resolver.
if _, ok := addrsSet[a]; !ok {
b.cc.RemoveSubConn(scInfo.sc)
delete(b.subConns, a)
addrsUpdated = true
// Keep the state of this sc in b.scStates until sc's state becomes Shutdown.
// The entry will be deleted in UpdateSubConnState.
}
}
return addrsUpdated
}
func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error {
b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig))
if b.config == nil {
newConfig, ok := s.BalancerConfig.(*LBConfig)
if !ok {
return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig)
}
b.config = newConfig
}
// Successful resolution; clear resolver error and ensure we return nil.
b.resolverErr = nil
if b.updateAddresses(s.ResolverState.Addresses) {
// If addresses were updated, no matter whether it resulted in SubConn
// creation/deletion, or just weight update, we will need to regenerate
// the ring.
var err error
b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize)
if err != nil {
panic(err)
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
// If resolver state contains no addresses, return an error so ClientConn
// will trigger re-resolve. Also records this as an resolver error, so when
// the overall state turns transient failure, the error message will have
// the zero address information.
if len(s.ResolverState.Addresses) == 0 {
b.ResolverError(errors.New("produced zero addresses"))
return balancer.ErrBadResolverState
}
return nil
}
func (b *ringhashBalancer) ResolverError(err error) {
b.resolverErr = err
if len(b.subConns) == 0 {
b.state = connectivity.TransientFailure
}
if b.state != connectivity.TransientFailure {
// The picker will not change since the balancer does not currently
// report an error.
return
}
b.regeneratePicker()
b.cc.UpdateState(balancer.State{
ConnectivityState: b.state,
Picker: b.picker,
})
}
// UpdateSubConnState updates the per-SubConn state stored in the ring, and also
// the aggregated state.
//
// It triggers an update to cc when:
// - the new state is TransientFailure, to update the error message
// - it's possible that this is a noop, but sending an extra update is easier
// than comparing errors
// - the aggregated state is changed
// - the same picker will be sent again, but this update may trigger a re-pick
// for some RPCs.
func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
s := state.ConnectivityState
b.logger.Infof("handle SubConn state change: %p, %v", sc, s)
scs, ok := b.scStates[sc]
if !ok {
b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s)
return
}
oldSCState := scs.effectiveState()
scs.setState(s)
newSCState := scs.effectiveState()
var sendUpdate bool
oldBalancerState := b.state
b.state = b.csEvltr.recordTransition(oldSCState, newSCState)
if oldBalancerState != b.state {
sendUpdate = true
}
switch s {
case connectivity.Idle:
// When the overall state is TransientFailure, this will never get picks
// if there's a lower priority. Need to keep the SubConns connecting so
// there's a chance it will recover.
if b.state == connectivity.TransientFailure {
scs.queueConnect()
}
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Connecting:
// No need to send an update. No queued RPC can be unblocked. If the
// overall state changed because of this, sendUpdate is already true.
case connectivity.Ready:
// Resend the picker, there's no need to regenerate the picker because
// the ring didn't change.
sendUpdate = true
case connectivity.TransientFailure:
// Save error to be reported via picker.
b.connErr = state.ConnectionError
// Regenerate picker to update error message.
b.regeneratePicker()
sendUpdate = true
case connectivity.Shutdown:
// When an address was removed by resolver, b called RemoveSubConn but
// kept the sc's state in scStates. Remove state for this sc here.
delete(b.scStates, sc)
}
if sendUpdate {
b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker})
}
}
// mergeErrors builds an error from the last connection error and the last
// resolver error. Must only be called if b.state is TransientFailure.
func (b *ringhashBalancer) mergeErrors() error {
// connErr must always be non-nil unless there are no SubConns, in which
// case resolverErr must be non-nil.
if b.connErr == nil {
return fmt.Errorf("last resolver error: %v", b.resolverErr)
}
if b.resolverErr == nil {
return fmt.Errorf("last connection error: %v", b.connErr)
}
return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr)
}
func (b *ringhashBalancer) regeneratePicker() {
if b.state == connectivity.TransientFailure {
b.picker = base.NewErrPicker(b.mergeErrors())
return
}
b.picker = newPicker(b.ring, b.logger)
}
func (b *ringhashBalancer) Close() {}
// connectivityStateEvaluator takes the connectivity states of multiple SubConns
// and returns one aggregated connectivity state.
//
// It's not thread safe.
type connectivityStateEvaluator struct {
nums [5]uint64
}
// recordTransition records state change happening in subConn and based on that
// it evaluates what aggregated state should be.
//
// - If there is at least one subchannel in READY state, report READY.
// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE.
// - If there is at least one subchannel in CONNECTING state, report CONNECTING.
// - If there is at least one subchannel in Idle state, report Idle.
// - Otherwise, report TRANSIENT_FAILURE.
//
// Note that if there are 1 connecting, 2 transient failure, the overall state
// is transient failure. This is because the second transient failure is a
// fallback of the first failing SubConn, and we want to report transient
// failure to failover to the lower priority.
func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State {
// Update counters.
for idx, state := range []connectivity.State{oldState, newState} {
updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new.
cse.nums[state] += updateVal
}
if cse.nums[connectivity.Ready] > 0 {
return connectivity.Ready
}
if cse.nums[connectivity.TransientFailure] > 1 {
return connectivity.TransientFailure
}
if cse.nums[connectivity.Connecting] > 0 {
return connectivity.Connecting
}
if cse.nums[connectivity.Idle] > 0 {
return connectivity.Idle
}
return connectivity.TransientFailure
}

View File

@ -19,10 +19,18 @@
package ringhash
import (
"context"
"fmt"
"testing"
"time"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"google.golang.org/grpc/attributes"
"google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/weightedroundrobin"
"google.golang.org/grpc/connectivity"
"google.golang.org/grpc/resolver"
"google.golang.org/grpc/xds/internal/testutils"
)
@ -34,5 +42,417 @@ var (
)
const (
defaultTestTimeout = 10 * time.Second
defaultTestShortTimeout = 10 * time.Millisecond
testBackendAddrsCount = 12
)
var (
testBackendAddrStrs []string
testConfig = &LBConfig{MinRingSize: 1, MaxRingSize: 10}
)
func init() {
for i := 0; i < testBackendAddrsCount; i++ {
testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i))
}
}
func ctxWithHash(h uint64) context.Context {
return SetRequestHash(context.Background(), h)
}
// setupTest creates the balancer, and does an initial sanity check.
func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientConn, balancer.Balancer, balancer.Picker) {
t.Helper()
cc := testutils.NewTestClientConn(t)
builder := balancer.Get(Name)
b := builder.Build(cc, balancer.BuildOptions{})
if b == nil {
t.Fatalf("builder.Build(%s) failed and returned nil", Name)
}
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: addrs},
BalancerConfig: testConfig,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
for _, addr := range addrs {
addr1 := <-cc.NewSubConnAddrsCh
if want := []resolver.Address{addr}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) {
t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{})))
}
sc1 := <-cc.NewSubConnCh
// All the SubConns start in Idle, and should not Connect().
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
t.Errorf("unexpected Connect() from SubConn %v", sc1)
case <-time.After(defaultTestShortTimeout):
}
}
// Should also have a picker, with all SubConns in Idle.
p1 := <-cc.NewPickerCh
return cc, b, p1
}
func TestOneSubConn(t *testing.T) {
wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]}
cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1})
ring0 := p0.(*picker).ring
firstHash := ring0.items[0].hash
// firstHash-1 will pick the first (and only) SubConn from the ring.
testHash := firstHash - 1
// The first pick should be queued, and should trigger Connect() on the only
// SubConn.
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[0].sc.sc
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}
// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// Test pick with one backend.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
}
// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the
// same hash always pick the same SubConn. When the one picked is down, another
// one will be picked.
func TestThreeSubConnsAffinity(t *testing.T) {
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
firstHash := ring0.items[0].hash
// firstHash+1 will pick the second SubConn from the ring.
testHash := firstHash + 1
// The first pick should be queued, and should trigger Connect() on the only
// SubConn.
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
// The picked SubConn should be the second in the ring.
sc0 := ring0.items[1].sc.sc
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}
// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
// Turn down the subConn in use.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
p2 := <-cc.NewPickerCh
// Pick with the same hash should be queued, because the SubConn after the
// first picked is Idle.
if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
// The third SubConn in the ring should connect.
sc1 := ring0.items[2].sc.sc
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}
// Send state updates to Ready.
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// New picks should all return this SubConn.
p3 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// Now, after backoff, the first picked SubConn will turn Idle.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
// The picks above should have queued Connect() for the first picked
// SubConn, so this Idle state change will trigger a Connect().
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}
// After the first picked SubConn turn Ready, new picks should return it
// again (even though the second picked SubConn is also Ready).
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
p4 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
}
// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the
// same hash always pick the same SubConn. Then try different hash to pick
// another backend, and verify the first hash still picks the first backend.
func TestThreeSubConnsAffinityMultiple(t *testing.T) {
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
// This test doesn't update addresses, so this ring will be used by all the
// pickers.
ring0 := p0.(*picker).ring
firstHash := ring0.items[0].hash
// firstHash+1 will pick the second SubConn from the ring.
testHash := firstHash + 1
// The first pick should be queued, and should trigger Connect() on the only
// SubConn.
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc0 := ring0.items[1].sc.sc
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}
// Send state updates to Ready.
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// First hash should always pick sc0.
p1 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
secondHash := ring0.items[1].hash
// secondHash+1 will pick the third SubConn from the ring.
testHash2 := secondHash + 1
if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable {
t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable)
}
sc1 := ring0.items[2].sc.sc
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc1)
}
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// With the new generated picker, hash2 always picks sc1.
p2 := <-cc.NewPickerCh
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)})
if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1)
}
}
// But the first hash still picks sc0.
for i := 0; i < 5; i++ {
gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)})
if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) {
t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0)
}
}
}
func TestAddrWeightChange(t *testing.T) {
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
cc, b, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: wantAddrs},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
select {
case <-cc.NewPickerCh:
t.Fatalf("unexpected picker after UpdateClientConn with the same addresses")
case <-time.After(defaultTestShortTimeout):
}
// Delete an address, should send a new Picker.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
}},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
var p1 balancer.Picker
select {
case p1 = <-cc.NewPickerCh:
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses")
}
ring1 := p1.(*picker).ring
if ring1 == ring0 {
t.Fatalf("new picker after removing address has the same ring as before, want different")
}
// Another update with the same addresses, but different weight.
if err := b.UpdateClientConnState(balancer.ClientConnState{
ResolverState: resolver.State{Addresses: []resolver.Address{
{Addr: testBackendAddrStrs[0]},
weightedroundrobin.SetAddrInfo(
resolver.Address{Addr: testBackendAddrStrs[1]},
weightedroundrobin.AddrInfo{Weight: 2}),
}},
BalancerConfig: nil,
}); err != nil {
t.Fatalf("UpdateClientConnState returned err: %v", err)
}
var p2 balancer.Picker
select {
case p2 = <-cc.NewPickerCh:
case <-time.After(defaultTestTimeout):
t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses")
}
if p2.(*picker).ring == ring1 {
t.Fatalf("new picker after changing address weight has the same ring as before, want different")
}
}
// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the
// overall state is TransientFailure, the SubConns turning Idle will be
// triggered to Connect(). But not when the overall state is not
// TransientFailure.
func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) {
wantAddrs := []resolver.Address{
{Addr: testBackendAddrStrs[0]},
{Addr: testBackendAddrStrs[1]},
{Addr: testBackendAddrStrs[2]},
}
_, b, p0 := setupTest(t, wantAddrs)
ring0 := p0.(*picker).ring
// Turn all SubConns to TransientFailure.
for _, it := range ring0.items {
b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})
}
// The next one turning Idle should Connect().
sc0 := ring0.items[0].sc.sc
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle})
select {
case <-sc0.(*testutils.TestSubConn).ConnectCh:
case <-time.After(defaultTestTimeout):
t.Errorf("timeout waiting for Connect() from SubConn %v", sc0)
}
// If this SubConn is ready. Other SubConns turning Idle will not Connect().
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting})
b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready})
// The third SubConn in the ring should connect.
sc1 := ring0.items[1].sc.sc
b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle})
select {
case <-sc1.(*testutils.TestSubConn).ConnectCh:
t.Errorf("unexpected Connect() from SubConn %v", sc1)
case <-time.After(defaultTestShortTimeout):
}
}
func TestConnectivityStateEvaluatorRecordTransition(t *testing.T) {
tests := []struct {
name string
from, to []connectivity.State
want connectivity.State
}{
{
name: "one ready",
from: []connectivity.State{connectivity.Idle},
to: []connectivity.State{connectivity.Ready},
want: connectivity.Ready,
},
{
name: "one connecting",
from: []connectivity.State{connectivity.Idle},
to: []connectivity.State{connectivity.Connecting},
want: connectivity.Connecting,
},
{
name: "one ready one transient failure",
from: []connectivity.State{connectivity.Idle, connectivity.Idle},
to: []connectivity.State{connectivity.Ready, connectivity.TransientFailure},
want: connectivity.Ready,
},
{
name: "one connecting one transient failure",
from: []connectivity.State{connectivity.Idle, connectivity.Idle},
to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure},
want: connectivity.Connecting,
},
{
name: "one connecting two transient failure",
from: []connectivity.State{connectivity.Idle, connectivity.Idle, connectivity.Idle},
to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure},
want: connectivity.TransientFailure,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
cse := &connectivityStateEvaluator{}
var got connectivity.State
for i, fff := range tt.from {
ttt := tt.to[i]
got = cse.recordTransition(fff, ttt)
}
if got != tt.want {
t.Errorf("recordTransition() = %v, want %v", got, tt.want)
}
})
}
}

View File

@ -0,0 +1,136 @@
//go:build !386
// +build !386
/*
*
* Copyright 2021 gRPC authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package xds_test
import (
"context"
"fmt"
"testing"
v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3"
v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/internal/xds/env"
testpb "google.golang.org/grpc/test/grpc_testing"
"google.golang.org/grpc/xds/internal/testutils/e2e"
)
const hashHeaderName = "session_id"
// hashRouteConfig returns a RouteConfig resource with hash policy set to
// header "session_id".
func hashRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.RouteConfiguration {
return &v3routepb.RouteConfiguration{
Name: routeName,
VirtualHosts: []*v3routepb.VirtualHost{{
Domains: []string{ldsTarget},
Routes: []*v3routepb.Route{{
Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}},
Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{
ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName},
HashPolicy: []*v3routepb.RouteAction_HashPolicy{{
PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{
Header: &v3routepb.RouteAction_HashPolicy_Header{
HeaderName: hashHeaderName,
},
},
Terminal: true,
}},
}},
}},
}},
}
}
// ringhashCluster returns a Cluster resource that picks ringhash as the lb
// policy.
func ringhashCluster(clusterName, edsServiceName string) *v3clusterpb.Cluster {
return &v3clusterpb.Cluster{
Name: clusterName,
ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS},
EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{
EdsConfig: &v3corepb.ConfigSource{
ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{
Ads: &v3corepb.AggregatedConfigSource{},
},
},
ServiceName: edsServiceName,
},
LbPolicy: v3clusterpb.Cluster_RING_HASH,
}
}
// TestClientSideAffinitySanityCheck tests that the affinity config can be
// propagated to pick the ring_hash policy. It doesn't test the affinity
// behavior in ring_hash policy.
func (s) TestClientSideAffinitySanityCheck(t *testing.T) {
defer func() func() {
old := env.RingHashSupport
env.RingHashSupport = true
return func() { env.RingHashSupport = old }
}()()
managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t)
defer cleanup1()
port, cleanup2 := clientSetup(t, &testService{})
defer cleanup2()
const serviceName = "my-service-client-side-xds"
resources := e2e.DefaultClientResources(e2e.ResourceParams{
DialTarget: serviceName,
NodeID: nodeID,
Host: "localhost",
Port: port,
SecLevel: e2e.SecurityLevelNone,
})
// Replace RDS and CDS resources with ringhash config, but keep the resource
// names.
resources.Routes = []*v3routepb.RouteConfiguration{hashRouteConfig(
resources.Routes[0].Name,
resources.Listeners[0].Name,
resources.Clusters[0].Name,
)}
resources.Clusters = []*v3clusterpb.Cluster{ringhashCluster(
resources.Clusters[0].Name,
resources.Clusters[0].EdsClusterConfig.ServiceName,
)}
ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout)
defer cancel()
if err := managementServer.Update(ctx, resources); err != nil {
t.Fatal(err)
}
// Create a ClientConn and make a successful RPC.
cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver))
if err != nil {
t.Fatalf("failed to dial local test server: %v", err)
}
defer cc.Close()
client := testpb.NewTestServiceClient(cc)
if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil {
t.Fatalf("rpc EmptyCall() failed: %v", err)
}
}