From 4f093b9a5afa5f3c8f29774dbdce8c02ce516d70 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Wed, 15 Sep 2021 14:47:18 -0700 Subject: [PATCH] ringhash: the balancer (#4741) --- xds/internal/balancer/ringhash/logging.go | 34 ++ xds/internal/balancer/ringhash/picker.go | 17 +- xds/internal/balancer/ringhash/ringhash.go | 363 +++++++++++++-- .../balancer/ringhash/ringhash_test.go | 420 ++++++++++++++++++ xds/internal/test/xds_client_affinity_test.go | 136 ++++++ 5 files changed, 938 insertions(+), 32 deletions(-) create mode 100644 xds/internal/balancer/ringhash/logging.go create mode 100644 xds/internal/test/xds_client_affinity_test.go diff --git a/xds/internal/balancer/ringhash/logging.go b/xds/internal/balancer/ringhash/logging.go new file mode 100644 index 000000000..64a1d467f --- /dev/null +++ b/xds/internal/balancer/ringhash/logging.go @@ -0,0 +1,34 @@ +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package ringhash + +import ( + "fmt" + + "google.golang.org/grpc/grpclog" + internalgrpclog "google.golang.org/grpc/internal/grpclog" +) + +const prefix = "[ring-hash-lb %p] " + +var logger = grpclog.Component("xds") + +func prefixLogger(p *ringhashBalancer) *internalgrpclog.PrefixLogger { + return internalgrpclog.NewPrefixLogger(logger, fmt.Sprintf(prefix, p)) +} diff --git a/xds/internal/balancer/ringhash/picker.go b/xds/internal/balancer/ringhash/picker.go index 6d035b0c1..dcea6d46e 100644 --- a/xds/internal/balancer/ringhash/picker.go +++ b/xds/internal/balancer/ringhash/picker.go @@ -24,11 +24,17 @@ import ( "google.golang.org/grpc/balancer" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpclog" "google.golang.org/grpc/status" ) type picker struct { - ring *ring + ring *ring + logger *grpclog.PrefixLogger +} + +func newPicker(ring *ring, logger *grpclog.PrefixLogger) *picker { + return &picker{ring: ring, logger: logger} } // handleRICSResult is the return type of handleRICS. It's needed to wrap the @@ -47,7 +53,7 @@ type handleRICSResult struct { // The first return value indicates if the state is in Ready, Idle, Connecting // or Shutdown. If it's true, the PickResult and error should be returned from // Pick() as is. -func handleRICS(e *ringEntry) (handleRICSResult, bool) { +func (p *picker) handleRICS(e *ringEntry) (handleRICSResult, bool) { switch state := e.sc.effectiveState(); state { case connectivity.Ready: return handleRICSResult{pr: balancer.PickResult{SubConn: e.sc.sc}}, true @@ -67,15 +73,14 @@ func handleRICS(e *ringEntry) (handleRICSResult, bool) { default: // Should never reach this. All the connectivity states are already // handled in the cases. - // - // FIXME: add an error log. + p.logger.Errorf("SubConn has undefined connectivity state: %v", state) return handleRICSResult{err: status.Errorf(codes.Unavailable, "SubConn has undefined connectivity state: %v", state)}, true } } func (p *picker) Pick(info balancer.PickInfo) (balancer.PickResult, error) { e := p.ring.pick(getRequestHash(info.Ctx)) - if hr, ok := handleRICS(e); ok { + if hr, ok := p.handleRICS(e); ok { return hr.pr, hr.err } // ok was false, the entry is in transient failure. @@ -95,7 +100,7 @@ func (p *picker) handleTransientFailure(e *ringEntry) (balancer.PickResult, erro // For the second SubConn, also check Ready/Idle/Connecting as if it's the // first entry. - if hr, ok := handleRICS(e2); ok { + if hr, ok := p.handleRICS(e2); ok { return hr.pr, hr.err } diff --git a/xds/internal/balancer/ringhash/ringhash.go b/xds/internal/balancer/ringhash/ringhash.go index b87cce648..f8a47f165 100644 --- a/xds/internal/balancer/ringhash/ringhash.go +++ b/xds/internal/balancer/ringhash/ringhash.go @@ -20,15 +20,50 @@ package ringhash import ( + "encoding/json" + "errors" + "fmt" "sync" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/base" + "google.golang.org/grpc/balancer/weightedroundrobin" "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/internal/grpclog" + "google.golang.org/grpc/internal/pretty" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/serviceconfig" ) // Name is the name of the ring_hash balancer. const Name = "ring_hash_experimental" +func init() { + balancer.Register(bb{}) +} + +type bb struct{} + +func (bb) Build(cc balancer.ClientConn, bOpts balancer.BuildOptions) balancer.Balancer { + b := &ringhashBalancer{ + cc: cc, + subConns: make(map[resolver.Address]*subConn), + scStates: make(map[balancer.SubConn]*subConn), + csEvltr: &connectivityStateEvaluator{}, + } + b.logger = prefixLogger(b) + b.logger.Infof("Created") + return b +} + +func (bb) Name() string { + return Name +} + +func (bb) ParseConfig(c json.RawMessage) (serviceconfig.LoadBalancingConfig, error) { + return parseConfig(c) +} + type subConn struct { addr string sc balancer.SubConn @@ -37,26 +72,25 @@ type subConn struct { // This is the actual state of this SubConn (as updated by the ClientConn). // The effective state can be different, see comment of attemptedToConnect. state connectivity.State - // attemptedToConnect is whether this SubConn has attempted to connect ever. - // So that only the initial Idle is Idle, after any attempt to connect, - // following Idles are all TransientFailure. + // failing is whether this SubConn is in a failing state. A subConn is + // considered to be in a failing state if it was previously in + // TransientFailure. // - // This affects the effective connectivity state of this SubConn, e.g. if - // the actual state is Idle, but this SubConn has attempted to connect, the - // effective state is TransientFailure. + // This affects the effective connectivity state of this SubConn, e.g. + // - if the actual state is Idle or Connecting, but this SubConn is failing, + // the effective state is TransientFailure. // - // This is used in pick(). E.g. if a subConn is Idle, but has - // attemptedToConnect as true, pick() will + // This is used in pick(). E.g. if a subConn is Idle, but has failing as + // true, pick() will // - consider this SubConn as TransientFailure, and check the state of the // next SubConn. // - trigger Connect() (note that normally a SubConn in real // TransientFailure cannot Connect()) // - // Note this should only be set when updating the state (from Idle to - // anything else), not when Connect() is called, because there's a small - // window after the first Connect(), before the state switches to something - // else. - attemptedToConnect bool + // A subConn starts in non-failing (failing is false). A transition to + // TransientFailure sets failing to true (and it stays true). A transition + // to Ready sets failing to false. + failing bool // connectQueued is true if a Connect() was queued for this SubConn while // it's not in Idle (most likely was in TransientFailure). A Connect() will // be triggered on this SubConn when it turns Idle. @@ -66,20 +100,13 @@ type subConn struct { connectQueued bool } -// SetState updates the state of this SubConn. +// setState updates the state of this SubConn. // // It also handles the queued Connect(). If the new state is Idle, and a // Connect() was queued, this SubConn will be triggered to Connect(). -// -// FIXME: unexport this. It's exported so that staticcheck doesn't complain -// about unused functions. -func (sc *subConn) SetState(s connectivity.State) { +func (sc *subConn) setState(s connectivity.State) { sc.mu.Lock() defer sc.mu.Unlock() - // Any state change to non-Idle means there was an attempt to connect. - if s != connectivity.Idle { - sc.attemptedToConnect = true - } switch s { case connectivity.Idle: // Trigger Connect() if new state is Idle, and there is a queued connect. @@ -87,21 +114,30 @@ func (sc *subConn) SetState(s connectivity.State) { sc.connectQueued = false sc.sc.Connect() } - case connectivity.Connecting, connectivity.Ready: + case connectivity.Connecting: // Clear connectQueued if the SubConn isn't failing. This state // transition is unlikely to happen, but handle this just in case. sc.connectQueued = false + case connectivity.Ready: + // Clear connectQueued if the SubConn isn't failing. This state + // transition is unlikely to happen, but handle this just in case. + sc.connectQueued = false + // Set to a non-failing state. + sc.failing = false + case connectivity.TransientFailure: + // Set to a failing state. + sc.failing = true } sc.state = s } // effectiveState returns the effective state of this SubConn. It can be -// different from the actual state, e.g. Idle after any attempt to connect (any -// Idle other than the initial Idle) is considered TransientFailure. +// different from the actual state, e.g. Idle while the subConn is failing is +// considered TransientFailure. Read comment of field failing for other cases. func (sc *subConn) effectiveState() connectivity.State { sc.mu.RLock() defer sc.mu.RUnlock() - if sc.state == connectivity.Idle && sc.attemptedToConnect { + if sc.failing && (sc.state == connectivity.Idle || sc.state == connectivity.Connecting) { return connectivity.TransientFailure } return sc.state @@ -121,3 +157,278 @@ func (sc *subConn) queueConnect() { // after backoff in TransientFailure), it will Connect(). sc.connectQueued = true } + +type ringhashBalancer struct { + cc balancer.ClientConn + logger *grpclog.PrefixLogger + + config *LBConfig + + subConns map[resolver.Address]*subConn // `attributes` is stripped from the keys of this map (the addresses) + scStates map[balancer.SubConn]*subConn + + // ring is always in sync with subConns. When subConns change, a new ring is + // generated. Note that address weights updates (they are keys in the + // subConns map) also regenerates the ring. + ring *ring + picker balancer.Picker + csEvltr *connectivityStateEvaluator + state connectivity.State + + resolverErr error // the last error reported by the resolver; cleared on successful resolution + connErr error // the last connection error; cleared upon leaving TransientFailure +} + +// updateAddresses creates new SubConns and removes SubConns, based on the +// address update. +// +// The return value is whether the new address list is different from the +// previous. True if +// - an address was added +// - an address was removed +// - an address's weight was updated +// +// Note that this function doesn't trigger SubConn connecting, so all the new +// SubConn states are Idle. +func (b *ringhashBalancer) updateAddresses(addrs []resolver.Address) bool { + var addrsUpdated bool + // addrsSet is the set converted from addrs, it's used for quick lookup of + // an address. + // + // Addresses in this map all have attributes stripped, but metadata set to + // the weight. So that weight change can be detected. + // + // TODO: this won't be necessary if there are ways to compare address + // attributes. + addrsSet := make(map[resolver.Address]struct{}) + for _, a := range addrs { + aNoAttrs := a + // Strip attributes but set Metadata to the weight. + aNoAttrs.Attributes = nil + w := weightedroundrobin.GetAddrInfo(a).Weight + if w == 0 { + // If weight is not set, use 1. + w = 1 + } + aNoAttrs.Metadata = w + addrsSet[aNoAttrs] = struct{}{} + if scInfo, ok := b.subConns[aNoAttrs]; !ok { + // When creating SubConn, the original address with attributes is + // passed through. So that connection configurations in attributes + // (like creds) will be used. + sc, err := b.cc.NewSubConn([]resolver.Address{a}, balancer.NewSubConnOptions{HealthCheckEnabled: true}) + if err != nil { + logger.Warningf("base.baseBalancer: failed to create new SubConn: %v", err) + continue + } + scs := &subConn{addr: a.Addr, sc: sc} + scs.setState(connectivity.Idle) + b.state = b.csEvltr.recordTransition(connectivity.Shutdown, connectivity.Idle) + b.subConns[aNoAttrs] = scs + b.scStates[sc] = scs + addrsUpdated = true + } else { + // Always update the subconn's address in case the attributes + // changed. The SubConn does a reflect.DeepEqual of the new and old + // addresses. So this is a noop if the current address is the same + // as the old one (including attributes). + b.subConns[aNoAttrs] = scInfo + b.cc.UpdateAddresses(scInfo.sc, []resolver.Address{a}) + } + } + for a, scInfo := range b.subConns { + // a was removed by resolver. + if _, ok := addrsSet[a]; !ok { + b.cc.RemoveSubConn(scInfo.sc) + delete(b.subConns, a) + addrsUpdated = true + // Keep the state of this sc in b.scStates until sc's state becomes Shutdown. + // The entry will be deleted in UpdateSubConnState. + } + } + return addrsUpdated +} + +func (b *ringhashBalancer) UpdateClientConnState(s balancer.ClientConnState) error { + b.logger.Infof("Received update from resolver, balancer config: %+v", pretty.ToJSON(s.BalancerConfig)) + if b.config == nil { + newConfig, ok := s.BalancerConfig.(*LBConfig) + if !ok { + return fmt.Errorf("unexpected balancer config with type: %T", s.BalancerConfig) + } + b.config = newConfig + } + + // Successful resolution; clear resolver error and ensure we return nil. + b.resolverErr = nil + if b.updateAddresses(s.ResolverState.Addresses) { + // If addresses were updated, no matter whether it resulted in SubConn + // creation/deletion, or just weight update, we will need to regenerate + // the ring. + var err error + b.ring, err = newRing(b.subConns, b.config.MinRingSize, b.config.MaxRingSize) + if err != nil { + panic(err) + } + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + } + + // If resolver state contains no addresses, return an error so ClientConn + // will trigger re-resolve. Also records this as an resolver error, so when + // the overall state turns transient failure, the error message will have + // the zero address information. + if len(s.ResolverState.Addresses) == 0 { + b.ResolverError(errors.New("produced zero addresses")) + return balancer.ErrBadResolverState + } + return nil +} + +func (b *ringhashBalancer) ResolverError(err error) { + b.resolverErr = err + if len(b.subConns) == 0 { + b.state = connectivity.TransientFailure + } + + if b.state != connectivity.TransientFailure { + // The picker will not change since the balancer does not currently + // report an error. + return + } + b.regeneratePicker() + b.cc.UpdateState(balancer.State{ + ConnectivityState: b.state, + Picker: b.picker, + }) +} + +// UpdateSubConnState updates the per-SubConn state stored in the ring, and also +// the aggregated state. +// +// It triggers an update to cc when: +// - the new state is TransientFailure, to update the error message +// - it's possible that this is a noop, but sending an extra update is easier +// than comparing errors +// - the aggregated state is changed +// - the same picker will be sent again, but this update may trigger a re-pick +// for some RPCs. +func (b *ringhashBalancer) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) { + s := state.ConnectivityState + b.logger.Infof("handle SubConn state change: %p, %v", sc, s) + scs, ok := b.scStates[sc] + if !ok { + b.logger.Infof("got state changes for an unknown SubConn: %p, %v", sc, s) + return + } + oldSCState := scs.effectiveState() + scs.setState(s) + newSCState := scs.effectiveState() + + var sendUpdate bool + oldBalancerState := b.state + b.state = b.csEvltr.recordTransition(oldSCState, newSCState) + if oldBalancerState != b.state { + sendUpdate = true + } + + switch s { + case connectivity.Idle: + // When the overall state is TransientFailure, this will never get picks + // if there's a lower priority. Need to keep the SubConns connecting so + // there's a chance it will recover. + if b.state == connectivity.TransientFailure { + scs.queueConnect() + } + // No need to send an update. No queued RPC can be unblocked. If the + // overall state changed because of this, sendUpdate is already true. + case connectivity.Connecting: + // No need to send an update. No queued RPC can be unblocked. If the + // overall state changed because of this, sendUpdate is already true. + case connectivity.Ready: + // Resend the picker, there's no need to regenerate the picker because + // the ring didn't change. + sendUpdate = true + case connectivity.TransientFailure: + // Save error to be reported via picker. + b.connErr = state.ConnectionError + // Regenerate picker to update error message. + b.regeneratePicker() + sendUpdate = true + case connectivity.Shutdown: + // When an address was removed by resolver, b called RemoveSubConn but + // kept the sc's state in scStates. Remove state for this sc here. + delete(b.scStates, sc) + } + + if sendUpdate { + b.cc.UpdateState(balancer.State{ConnectivityState: b.state, Picker: b.picker}) + } +} + +// mergeErrors builds an error from the last connection error and the last +// resolver error. Must only be called if b.state is TransientFailure. +func (b *ringhashBalancer) mergeErrors() error { + // connErr must always be non-nil unless there are no SubConns, in which + // case resolverErr must be non-nil. + if b.connErr == nil { + return fmt.Errorf("last resolver error: %v", b.resolverErr) + } + if b.resolverErr == nil { + return fmt.Errorf("last connection error: %v", b.connErr) + } + return fmt.Errorf("last connection error: %v; last resolver error: %v", b.connErr, b.resolverErr) +} + +func (b *ringhashBalancer) regeneratePicker() { + if b.state == connectivity.TransientFailure { + b.picker = base.NewErrPicker(b.mergeErrors()) + return + } + b.picker = newPicker(b.ring, b.logger) +} + +func (b *ringhashBalancer) Close() {} + +// connectivityStateEvaluator takes the connectivity states of multiple SubConns +// and returns one aggregated connectivity state. +// +// It's not thread safe. +type connectivityStateEvaluator struct { + nums [5]uint64 +} + +// recordTransition records state change happening in subConn and based on that +// it evaluates what aggregated state should be. +// +// - If there is at least one subchannel in READY state, report READY. +// - If there are 2 or more subchannels in TRANSIENT_FAILURE state, report TRANSIENT_FAILURE. +// - If there is at least one subchannel in CONNECTING state, report CONNECTING. +// - If there is at least one subchannel in Idle state, report Idle. +// - Otherwise, report TRANSIENT_FAILURE. +// +// Note that if there are 1 connecting, 2 transient failure, the overall state +// is transient failure. This is because the second transient failure is a +// fallback of the first failing SubConn, and we want to report transient +// failure to failover to the lower priority. +func (cse *connectivityStateEvaluator) recordTransition(oldState, newState connectivity.State) connectivity.State { + // Update counters. + for idx, state := range []connectivity.State{oldState, newState} { + updateVal := 2*uint64(idx) - 1 // -1 for oldState and +1 for new. + cse.nums[state] += updateVal + } + + if cse.nums[connectivity.Ready] > 0 { + return connectivity.Ready + } + if cse.nums[connectivity.TransientFailure] > 1 { + return connectivity.TransientFailure + } + if cse.nums[connectivity.Connecting] > 0 { + return connectivity.Connecting + } + if cse.nums[connectivity.Idle] > 0 { + return connectivity.Idle + } + return connectivity.TransientFailure +} diff --git a/xds/internal/balancer/ringhash/ringhash_test.go b/xds/internal/balancer/ringhash/ringhash_test.go index bf5da95bf..fb85367e4 100644 --- a/xds/internal/balancer/ringhash/ringhash_test.go +++ b/xds/internal/balancer/ringhash/ringhash_test.go @@ -19,10 +19,18 @@ package ringhash import ( + "context" + "fmt" + "testing" "time" "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" + "google.golang.org/grpc/attributes" + "google.golang.org/grpc/balancer" + "google.golang.org/grpc/balancer/weightedroundrobin" + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/resolver" "google.golang.org/grpc/xds/internal/testutils" ) @@ -34,5 +42,417 @@ var ( ) const ( + defaultTestTimeout = 10 * time.Second defaultTestShortTimeout = 10 * time.Millisecond + + testBackendAddrsCount = 12 ) + +var ( + testBackendAddrStrs []string + testConfig = &LBConfig{MinRingSize: 1, MaxRingSize: 10} +) + +func init() { + for i := 0; i < testBackendAddrsCount; i++ { + testBackendAddrStrs = append(testBackendAddrStrs, fmt.Sprintf("%d.%d.%d.%d:%d", i, i, i, i, i)) + } +} + +func ctxWithHash(h uint64) context.Context { + return SetRequestHash(context.Background(), h) +} + +// setupTest creates the balancer, and does an initial sanity check. +func setupTest(t *testing.T, addrs []resolver.Address) (*testutils.TestClientConn, balancer.Balancer, balancer.Picker) { + t.Helper() + cc := testutils.NewTestClientConn(t) + builder := balancer.Get(Name) + b := builder.Build(cc, balancer.BuildOptions{}) + if b == nil { + t.Fatalf("builder.Build(%s) failed and returned nil", Name) + } + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: addrs}, + BalancerConfig: testConfig, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + + for _, addr := range addrs { + addr1 := <-cc.NewSubConnAddrsCh + if want := []resolver.Address{addr}; !cmp.Equal(addr1, want, cmp.AllowUnexported(attributes.Attributes{})) { + t.Fatalf("got unexpected new subconn addrs: %v", cmp.Diff(addr1, want, cmp.AllowUnexported(attributes.Attributes{}))) + } + sc1 := <-cc.NewSubConnCh + // All the SubConns start in Idle, and should not Connect(). + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + t.Errorf("unexpected Connect() from SubConn %v", sc1) + case <-time.After(defaultTestShortTimeout): + } + } + + // Should also have a picker, with all SubConns in Idle. + p1 := <-cc.NewPickerCh + return cc, b, p1 +} + +func TestOneSubConn(t *testing.T) { + wantAddr1 := resolver.Address{Addr: testBackendAddrStrs[0]} + cc, b, p0 := setupTest(t, []resolver.Address{wantAddr1}) + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash-1 will pick the first (and only) SubConn from the ring. + testHash := firstHash - 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc0 := ring0.items[0].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // Test pick with one backend. + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the +// same hash always pick the same SubConn. When the one picked is down, another +// one will be picked. +func TestThreeSubConnsAffinity(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + // This test doesn't update addresses, so this ring will be used by all the + // pickers. + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash+1 will pick the second SubConn from the ring. + testHash := firstHash + 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + // The picked SubConn should be the second in the ring. + sc0 := ring0.items[1].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } + + // Turn down the subConn in use. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + p2 := <-cc.NewPickerCh + // Pick with the same hash should be queued, because the SubConn after the + // first picked is Idle. + if _, err := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + + // The third SubConn in the ring should connect. + sc1 := ring0.items[2].sc.sc + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc1) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + // New picks should all return this SubConn. + p3 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p3.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + } + + // Now, after backoff, the first picked SubConn will turn Idle. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + // The picks above should have queued Connect() for the first picked + // SubConn, so this Idle state change will trigger a Connect(). + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // After the first picked SubConn turn Ready, new picks should return it + // again (even though the second picked SubConn is also Ready). + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + p4 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p4.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +// TestThreeBackendsAffinity covers that there are 3 SubConns, RPCs with the +// same hash always pick the same SubConn. Then try different hash to pick +// another backend, and verify the first hash still picks the first backend. +func TestThreeSubConnsAffinityMultiple(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + // This test doesn't update addresses, so this ring will be used by all the + // pickers. + ring0 := p0.(*picker).ring + + firstHash := ring0.items[0].hash + // firstHash+1 will pick the second SubConn from the ring. + testHash := firstHash + 1 + // The first pick should be queued, and should trigger Connect() on the only + // SubConn. + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc0 := ring0.items[1].sc.sc + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // Send state updates to Ready. + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // First hash should always pick sc0. + p1 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p1.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } + + secondHash := ring0.items[1].hash + // secondHash+1 will pick the third SubConn from the ring. + testHash2 := secondHash + 1 + if _, err := p0.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}); err != balancer.ErrNoSubConnAvailable { + t.Fatalf("first pick returned err %v, want %v", err, balancer.ErrNoSubConnAvailable) + } + sc1 := ring0.items[2].sc.sc + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc1) + } + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // With the new generated picker, hash2 always picks sc1. + p2 := <-cc.NewPickerCh + for i := 0; i < 5; i++ { + gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash2)}) + if !cmp.Equal(gotSCSt.SubConn, sc1, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc1) + } + } + // But the first hash still picks sc0. + for i := 0; i < 5; i++ { + gotSCSt, _ := p2.Pick(balancer.PickInfo{Ctx: ctxWithHash(testHash)}) + if !cmp.Equal(gotSCSt.SubConn, sc0, cmp.AllowUnexported(testutils.TestSubConn{})) { + t.Fatalf("picker.Pick, got %v, want SubConn=%v", gotSCSt, sc0) + } + } +} + +func TestAddrWeightChange(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + cc, b, p0 := setupTest(t, wantAddrs) + ring0 := p0.(*picker).ring + + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: wantAddrs}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + select { + case <-cc.NewPickerCh: + t.Fatalf("unexpected picker after UpdateClientConn with the same addresses") + case <-time.After(defaultTestShortTimeout): + } + + // Delete an address, should send a new Picker. + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + }}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + var p1 balancer.Picker + select { + case p1 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") + } + ring1 := p1.(*picker).ring + if ring1 == ring0 { + t.Fatalf("new picker after removing address has the same ring as before, want different") + } + + // Another update with the same addresses, but different weight. + if err := b.UpdateClientConnState(balancer.ClientConnState{ + ResolverState: resolver.State{Addresses: []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + weightedroundrobin.SetAddrInfo( + resolver.Address{Addr: testBackendAddrStrs[1]}, + weightedroundrobin.AddrInfo{Weight: 2}), + }}, + BalancerConfig: nil, + }); err != nil { + t.Fatalf("UpdateClientConnState returned err: %v", err) + } + var p2 balancer.Picker + select { + case p2 = <-cc.NewPickerCh: + case <-time.After(defaultTestTimeout): + t.Fatalf("timeout waiting for picker after UpdateClientConn with different addresses") + } + if p2.(*picker).ring == ring1 { + t.Fatalf("new picker after changing address weight has the same ring as before, want different") + } +} + +// TestSubConnToConnectWhenOverallTransientFailure covers the situation when the +// overall state is TransientFailure, the SubConns turning Idle will be +// triggered to Connect(). But not when the overall state is not +// TransientFailure. +func TestSubConnToConnectWhenOverallTransientFailure(t *testing.T) { + wantAddrs := []resolver.Address{ + {Addr: testBackendAddrStrs[0]}, + {Addr: testBackendAddrStrs[1]}, + {Addr: testBackendAddrStrs[2]}, + } + _, b, p0 := setupTest(t, wantAddrs) + ring0 := p0.(*picker).ring + + // Turn all SubConns to TransientFailure. + for _, it := range ring0.items { + b.UpdateSubConnState(it.sc.sc, balancer.SubConnState{ConnectivityState: connectivity.TransientFailure}) + } + + // The next one turning Idle should Connect(). + sc0 := ring0.items[0].sc.sc + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc0.(*testutils.TestSubConn).ConnectCh: + case <-time.After(defaultTestTimeout): + t.Errorf("timeout waiting for Connect() from SubConn %v", sc0) + } + + // If this SubConn is ready. Other SubConns turning Idle will not Connect(). + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Connecting}) + b.UpdateSubConnState(sc0, balancer.SubConnState{ConnectivityState: connectivity.Ready}) + + // The third SubConn in the ring should connect. + sc1 := ring0.items[1].sc.sc + b.UpdateSubConnState(sc1, balancer.SubConnState{ConnectivityState: connectivity.Idle}) + select { + case <-sc1.(*testutils.TestSubConn).ConnectCh: + t.Errorf("unexpected Connect() from SubConn %v", sc1) + case <-time.After(defaultTestShortTimeout): + } +} + +func TestConnectivityStateEvaluatorRecordTransition(t *testing.T) { + tests := []struct { + name string + from, to []connectivity.State + want connectivity.State + }{ + { + name: "one ready", + from: []connectivity.State{connectivity.Idle}, + to: []connectivity.State{connectivity.Ready}, + want: connectivity.Ready, + }, + { + name: "one connecting", + from: []connectivity.State{connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting}, + want: connectivity.Connecting, + }, + { + name: "one ready one transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Ready, connectivity.TransientFailure}, + want: connectivity.Ready, + }, + { + name: "one connecting one transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure}, + want: connectivity.Connecting, + }, + { + name: "one connecting two transient failure", + from: []connectivity.State{connectivity.Idle, connectivity.Idle, connectivity.Idle}, + to: []connectivity.State{connectivity.Connecting, connectivity.TransientFailure, connectivity.TransientFailure}, + want: connectivity.TransientFailure, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cse := &connectivityStateEvaluator{} + var got connectivity.State + for i, fff := range tt.from { + ttt := tt.to[i] + got = cse.recordTransition(fff, ttt) + } + if got != tt.want { + t.Errorf("recordTransition() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/xds/internal/test/xds_client_affinity_test.go b/xds/internal/test/xds_client_affinity_test.go new file mode 100644 index 000000000..e9ddfe157 --- /dev/null +++ b/xds/internal/test/xds_client_affinity_test.go @@ -0,0 +1,136 @@ +//go:build !386 +// +build !386 + +/* + * + * Copyright 2021 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package xds_test + +import ( + "context" + "fmt" + "testing" + + v3clusterpb "github.com/envoyproxy/go-control-plane/envoy/config/cluster/v3" + v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3" + v3routepb "github.com/envoyproxy/go-control-plane/envoy/config/route/v3" + "google.golang.org/grpc" + "google.golang.org/grpc/credentials/insecure" + "google.golang.org/grpc/internal/xds/env" + testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/grpc/xds/internal/testutils/e2e" +) + +const hashHeaderName = "session_id" + +// hashRouteConfig returns a RouteConfig resource with hash policy set to +// header "session_id". +func hashRouteConfig(routeName, ldsTarget, clusterName string) *v3routepb.RouteConfiguration { + return &v3routepb.RouteConfiguration{ + Name: routeName, + VirtualHosts: []*v3routepb.VirtualHost{{ + Domains: []string{ldsTarget}, + Routes: []*v3routepb.Route{{ + Match: &v3routepb.RouteMatch{PathSpecifier: &v3routepb.RouteMatch_Prefix{Prefix: "/"}}, + Action: &v3routepb.Route_Route{Route: &v3routepb.RouteAction{ + ClusterSpecifier: &v3routepb.RouteAction_Cluster{Cluster: clusterName}, + HashPolicy: []*v3routepb.RouteAction_HashPolicy{{ + PolicySpecifier: &v3routepb.RouteAction_HashPolicy_Header_{ + Header: &v3routepb.RouteAction_HashPolicy_Header{ + HeaderName: hashHeaderName, + }, + }, + Terminal: true, + }}, + }}, + }}, + }}, + } +} + +// ringhashCluster returns a Cluster resource that picks ringhash as the lb +// policy. +func ringhashCluster(clusterName, edsServiceName string) *v3clusterpb.Cluster { + return &v3clusterpb.Cluster{ + Name: clusterName, + ClusterDiscoveryType: &v3clusterpb.Cluster_Type{Type: v3clusterpb.Cluster_EDS}, + EdsClusterConfig: &v3clusterpb.Cluster_EdsClusterConfig{ + EdsConfig: &v3corepb.ConfigSource{ + ConfigSourceSpecifier: &v3corepb.ConfigSource_Ads{ + Ads: &v3corepb.AggregatedConfigSource{}, + }, + }, + ServiceName: edsServiceName, + }, + LbPolicy: v3clusterpb.Cluster_RING_HASH, + } +} + +// TestClientSideAffinitySanityCheck tests that the affinity config can be +// propagated to pick the ring_hash policy. It doesn't test the affinity +// behavior in ring_hash policy. +func (s) TestClientSideAffinitySanityCheck(t *testing.T) { + defer func() func() { + old := env.RingHashSupport + env.RingHashSupport = true + return func() { env.RingHashSupport = old } + }()() + + managementServer, nodeID, _, resolver, cleanup1 := setupManagementServer(t) + defer cleanup1() + + port, cleanup2 := clientSetup(t, &testService{}) + defer cleanup2() + + const serviceName = "my-service-client-side-xds" + resources := e2e.DefaultClientResources(e2e.ResourceParams{ + DialTarget: serviceName, + NodeID: nodeID, + Host: "localhost", + Port: port, + SecLevel: e2e.SecurityLevelNone, + }) + // Replace RDS and CDS resources with ringhash config, but keep the resource + // names. + resources.Routes = []*v3routepb.RouteConfiguration{hashRouteConfig( + resources.Routes[0].Name, + resources.Listeners[0].Name, + resources.Clusters[0].Name, + )} + resources.Clusters = []*v3clusterpb.Cluster{ringhashCluster( + resources.Clusters[0].Name, + resources.Clusters[0].EdsClusterConfig.ServiceName, + )} + ctx, cancel := context.WithTimeout(context.Background(), defaultTestTimeout) + defer cancel() + if err := managementServer.Update(ctx, resources); err != nil { + t.Fatal(err) + } + + // Create a ClientConn and make a successful RPC. + cc, err := grpc.Dial(fmt.Sprintf("xds:///%s", serviceName), grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithResolvers(resolver)) + if err != nil { + t.Fatalf("failed to dial local test server: %v", err) + } + defer cc.Close() + + client := testpb.NewTestServiceClient(cc) + if _, err := client.EmptyCall(ctx, &testpb.Empty{}, grpc.WaitForReady(true)); err != nil { + t.Fatalf("rpc EmptyCall() failed: %v", err) + } +}