From 06c38685955f4d0c6ef86140de4e70663f13f75d Mon Sep 17 00:00:00 2001 From: Lei Zhao Date: Thu, 2 Sep 2021 10:10:36 +0800 Subject: [PATCH] select replica for replica read requests using ReplicaSelector (#285) Signed-off-by: youjiali1995 --- internal/locate/region_cache.go | 2 - internal/locate/region_cache_test.go | 108 +--------- internal/locate/region_request.go | 256 ++++++++++++++---------- internal/locate/region_request3_test.go | 188 +++++++++++++---- tikvrpc/tikvrpc.go | 6 + 5 files changed, 307 insertions(+), 253 deletions(-) diff --git a/internal/locate/region_cache.go b/internal/locate/region_cache.go index cf926b6b..01629cab 100644 --- a/internal/locate/region_cache.go +++ b/internal/locate/region_cache.go @@ -468,8 +468,6 @@ type RPCContext struct { ProxyStore *Store // nil means proxy is not used ProxyAddr string // valid when ProxyStore is not nil TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers. - - tryTimes int } func (c *RPCContext) String() string { diff --git a/internal/locate/region_cache_test.go b/internal/locate/region_cache_test.go index 9abfd91e..91f18dc1 100644 --- a/internal/locate/region_cache_test.go +++ b/internal/locate/region_cache_test.go @@ -44,14 +44,11 @@ import ( "github.com/google/btree" "github.com/pingcap/kvproto/pkg/errorpb" - "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/kvproto/pkg/metapb" "github.com/stretchr/testify/suite" "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" - "github.com/tikv/client-go/v2/oracle" - "github.com/tikv/client-go/v2/tikvrpc" pd "github.com/tikv/pd/client" ) @@ -1016,7 +1013,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() { r := ctxTiFlash.Meta reqSend := NewRegionRequestSender(s.cache, nil) regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}} - reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil) + reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr) // check leader read should not go to tiflash lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0) @@ -1024,51 +1021,6 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() { s.NotEqual(lctx.Peer.Id, s.peer1) } -func (s *testRegionCacheSuite) TestRegionDataNotReady() { - loc1, err := s.cache.LocateKey(s.bo, []byte("a")) - s.Nil(err) - s.Equal(loc1.Region.id, s.region1) - testcases := []struct { - scope string - readType kv.ReplicaReadType - expectPeerID uint64 - expectOptsLen int - expectSeed uint32 - }{ - { - scope: oracle.GlobalTxnScope, - readType: kv.ReplicaReadFollower, - expectPeerID: s.peer2, - expectOptsLen: 1, - expectSeed: 1, - }, - { - scope: "local", - readType: kv.ReplicaReadFollower, - expectPeerID: s.peer2, - expectOptsLen: 0, - expectSeed: 1, - }, - } - - for _, testcase := range testcases { - fctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, testcase.readType, 0) - s.Nil(err) - s.Equal(fctx.Peer.Id, testcase.expectPeerID) - reqSend := NewRegionRequestSender(s.cache, nil) - regionErr := &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}} - var opts []StoreSelectorOption - seed := uint32(0) - s.bo.Reset() - req := &tikvrpc.Request{TxnScope: testcase.scope, Context: kvrpcpb.Context{StaleRead: true}, ReplicaReadSeed: &seed} - retry, err := reqSend.onRegionError(s.bo, fctx, req, regionErr, &opts) - s.Nil(err) - s.True(retry) - s.Equal(len(opts), testcase.expectOptsLen) - s.Equal(*req.GetReplicaReadSeed(), testcase.expectSeed) - } -} - const regionSplitKeyFormat = "t%08d" func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster { @@ -1284,64 +1236,6 @@ func (s *testRegionCacheSuite) TestMixedReadFallback() { s.Equal(ctx.Peer.Id, s.peer2) } -func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch() { - // 3 nodes and no.1 is region1 leader. - store3 := s.cluster.AllocID() - peer3 := s.cluster.AllocID() - s.cluster.AddStore(store3, s.storeAddr(store3)) - s.cluster.AddPeer(s.region1, store3, peer3) - s.cluster.ChangeLeader(s.region1, s.peer1) - - // Check the two regions. - loc1, err := s.cache.LocateKey(s.bo, []byte("a")) - s.Nil(err) - s.Equal(loc1.Region.id, s.region1) - - reqSend := NewRegionRequestSender(s.cache, nil) - - // follower read failed on store2 - followReqSeed := uint32(0) - ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadFollower, followReqSeed) - s.Nil(err) - s.Equal(ctxFollower1.Peer.Id, s.peer2) - s.Equal(ctxFollower1.Store.storeID, s.store2) - - regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} - reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) - s.Equal(followReqSeed, uint32(1)) - - regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}} - reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) - s.Equal(followReqSeed, uint32(2)) -} - -func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch() { - // 3 nodes and no.1 is region1 leader. - store3 := s.cluster.AllocID() - peer3 := s.cluster.AllocID() - s.cluster.AddStore(store3, s.storeAddr(store3)) - s.cluster.AddPeer(s.region1, store3, peer3) - s.cluster.ChangeLeader(s.region1, s.peer1) - - // Check the two regions. - loc1, err := s.cache.LocateKey(s.bo, []byte("a")) - s.Nil(err) - s.Equal(loc1.Region.id, s.region1) - - reqSend := NewRegionRequestSender(s.cache, nil) - - // follower read failed on store1 - followReqSeed := uint32(0) - ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadMixed, followReqSeed) - s.Nil(err) - s.Equal(ctxFollower1.Peer.Id, s.peer1) - s.Equal(ctxFollower1.Store.storeID, s.store1) - - regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}} - reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil) - s.Equal(followReqSeed, uint32(1)) -} - func (s *testRegionCacheSuite) TestPeersLenChange() { // 2 peers [peer1, peer2] and let peer2 become leader loc, err := s.cache.LocateKey(s.bo, []byte("a")) diff --git a/internal/locate/region_request.go b/internal/locate/region_request.go index 6ccfd455..c28b075f 100644 --- a/internal/locate/region_request.go +++ b/internal/locate/region_request.go @@ -60,7 +60,6 @@ import ( "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" - "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" ) @@ -99,13 +98,13 @@ func LoadShuttingDown() uint32 { // For other region errors, since region range have changed, the request may need to // split, so we simply return the error to caller. type RegionRequestSender struct { - regionCache *RegionCache - client client.Client - storeAddr string - rpcError error - leaderReplicaSelector *replicaSelector - failStoreIDs map[uint64]struct{} - failProxyStoreIDs map[uint64]struct{} + regionCache *RegionCache + client client.Client + storeAddr string + rpcError error + replicaSelector *replicaSelector + failStoreIDs map[uint64]struct{} + failProxyStoreIDs map[uint64]struct{} RegionRequestRuntimeStats } @@ -240,14 +239,20 @@ type replica struct { attempts int } +func (r *replica) isEpochStale() bool { + return r.epoch != atomic.LoadUint32(&r.store.epoch) +} + +func (r *replica) isExhausted(maxAttempt int) bool { + return r.attempts >= maxAttempt +} + type replicaSelector struct { regionCache *RegionCache region *Region regionStore *regionStore - // replicas contains all TiKV replicas for now and the leader is at the - // head of the slice. - replicas []*replica - state selectorState + replicas []*replica + state selectorState // replicas[targetIdx] is the replica handling the request this time targetIdx AccessIndex // replicas[proxyIdx] is the store used to redirect requests this time @@ -317,7 +322,7 @@ type accessKnownLeader struct { func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { leader := selector.replicas[state.leaderIdx] - if leader.attempts >= maxReplicaAttempt { + if leader.isExhausted(maxReplicaAttempt) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} return nil, stateChanged{} } @@ -331,7 +336,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx} return } - if liveness != reachable || selector.targetReplica().attempts >= maxReplicaAttempt { + if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) { selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx} } if liveness != reachable { @@ -366,7 +371,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) ( } targetReplica = selector.replicas[idx] // Each follower is only tried once - if targetReplica.attempts == 0 { + if !targetReplica.isExhausted(1) { state.lastIdx = idx selector.targetIdx = idx break @@ -429,8 +434,7 @@ func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) { selector.state = &invalidLeader{} } -// tryNewProxy is the state where we try to find a node from followers -// as proxy. +// tryNewProxy is the state where we try to find a node from followers as proxy. type tryNewProxy struct { stateBase leaderIdx AccessIndex @@ -477,7 +481,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) ( func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool { // Try each peer only once - return idx != state.leaderIdx && replica.attempts == 0 + return idx != state.leaderIdx && !replica.isExhausted(1) } func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) { @@ -494,6 +498,79 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) { selector.state = &invalidLeader{} } +// accessFollower is the state where we are sending requests to TiKV followers. +// If there is no suitable follower, requests will be sent to the leader as a fallback. +type accessFollower struct { + stateBase + // If tryLeader is true, the request can also be sent to the leader. + tryLeader bool + isGlobalStaleRead bool + option storeSelectorOp + leaderIdx AccessIndex + lastIdx AccessIndex +} + +func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) { + if state.lastIdx < 0 { + if state.tryLeader { + state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas))) + } else { + if len(selector.replicas) <= 1 { + state.lastIdx = state.leaderIdx + } else { + // Randomly select a non-leader peer + state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas) - 1)) + if state.lastIdx >= state.leaderIdx { + state.lastIdx++ + } + } + } + } else { + // Stale Read request will retry the leader or next peer on error, + // if txnScope is global, we will only retry the leader by using the WithLeaderOnly option, + // if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector. + if state.isGlobalStaleRead { + WithLeaderOnly()(&state.option) + } + state.lastIdx++ + } + + for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ { + idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas)) + if state.isCandidate(idx, selector.replicas[idx]) { + state.lastIdx = idx + selector.targetIdx = idx + break + } + } + // If there is no candidate, fallback to the leader. + if selector.targetIdx < 0 { + leader := selector.replicas[state.leaderIdx] + if leader.isEpochStale() || leader.isExhausted(1) { + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc() + selector.invalidateRegion() + return nil, nil + } + state.lastIdx = state.leaderIdx + selector.targetIdx = state.leaderIdx + } + return selector.buildRPCContext(bo) +} + +func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) { + if selector.checkLiveness(bo, selector.targetReplica()) != reachable { + selector.invalidateReplicaStore(selector.targetReplica(), cause) + } +} + +func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool { + return !replica.isEpochStale() && !replica.isExhausted(1) && + // The request can only be sent to the leader. + ((state.option.leaderOnly && idx == state.leaderIdx) || + // Choose a replica with matched labels. + (!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels))) +} + type invalidStore struct { stateBase } @@ -514,7 +591,9 @@ func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCCo return nil, nil } -func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replicaSelector, error) { +// newReplicaSelector creates a replicaSelector which selects replicas according to reqType and opts. +// opts is currently only effective for follower read. +func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption) (*replicaSelector, error) { cachedRegion := regionCache.GetCachedRegionWithRLock(regionID) if cachedRegion == nil || !cachedRegion.isValid() { return nil, nil @@ -530,11 +609,26 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replic }) } var state selectorState - if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { - state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx} + if !req.ReplicaReadType.IsFollowerRead() { + if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 { + state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx} + } else { + state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} + } } else { - state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx} + option := storeSelectorOp{} + for _, op := range opts { + op(&option) + } + state = &accessFollower{ + tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed, + isGlobalStaleRead: req.IsGlobalStaleRead(), + option: option, + leaderIdx: regionStore.workTiKVIdx, + lastIdx: -1, + } } + return &replicaSelector{ regionCache, cachedRegion, @@ -598,15 +692,21 @@ func (s *replicaSelector) refreshRegionStore() { return } - // If leader has changed, it means a recent request succeeds an RPC on the new - // leader. Give the leader an addition chance. + // If leader has changed, it means a recent request succeeds an RPC + // on the new leader. if oldRegionStore.workTiKVIdx != newRegionStore.workTiKVIdx { - newLeaderIdx := newRegionStore.workTiKVIdx - s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} - if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt { - s.replicas[newLeaderIdx].attempts-- + switch state := s.state.(type) { + case *accessFollower: + state.leaderIdx = newRegionStore.workTiKVIdx + default: + // Try the new leader and give it an addition chance if the + // request is sent to the leader. + newLeaderIdx := newRegionStore.workTiKVIdx + s.state = &accessKnownLeader{leaderIdx: newLeaderIdx} + if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt { + s.replicas[newLeaderIdx].attempts-- + } } - return } } @@ -614,8 +714,11 @@ func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, err targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica() // Backoff and retry if no replica is selected or the selected replica is stale - if targetReplica == nil || s.isReplicaStoreEpochStale(targetReplica) || - (proxyReplica != nil && s.isReplicaStoreEpochStale(proxyReplica)) { + if targetReplica == nil || targetReplica.isEpochStale() || + (proxyReplica != nil && proxyReplica.isEpochStale()) { + // TODO(youjiali1995): Is it necessary to invalidate the region? + metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc() + s.invalidateRegion() return nil, nil } @@ -656,17 +759,6 @@ func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, err return rpcCtx, nil } -func (s *replicaSelector) isReplicaStoreEpochStale(replica *replica) bool { - storeFailEpoch := atomic.LoadUint32(&replica.store.epoch) - if storeFailEpoch != replica.epoch { - // TODO(youjiali1995): Is it necessary to invalidate the region? - metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc() - s.invalidateRegion() - return true - } - return false -} - func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) { metrics.RegionCacheCounterWithSendFail.Inc() s.state.onSendFailure(bo, s, err) @@ -720,7 +812,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) { } for i, replica := range s.replicas { if isSamePeer(replica.peer, leader) { - if replica.attempts == maxReplicaAttempt { + if replica.isExhausted(maxReplicaAttempt) { // Give the replica one more chance and because each follower is tried only once, // it won't result in infinite retry. replica.attempts = maxReplicaAttempt - 1 @@ -755,25 +847,14 @@ func (s *RegionRequestSender) getRPCContext( ) (*RPCContext, error) { switch et { case tikvrpc.TiKV: - // Now only requests sent to the replica leader will use the replica selector to get - // the RPC context. - // TODO(youjiali1995): make all requests use the replica selector. - if req.ReplicaReadType == kv.ReplicaReadLeader { - if s.leaderReplicaSelector == nil { - selector, err := newReplicaSelector(s.regionCache, regionID) - if selector == nil || err != nil { - return nil, err - } - s.leaderReplicaSelector = selector + if s.replicaSelector == nil { + selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...) + if selector == nil || err != nil { + return nil, err } - return s.leaderReplicaSelector.next(bo) + s.replicaSelector = selector } - - var seed uint32 - if req.ReplicaReadSeed != nil { - seed = *req.ReplicaReadSeed - } - return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...) + return s.replicaSelector.next(bo) case tikvrpc.TiFlash: return s.regionCache.GetTiFlashRPCContext(bo, regionID, true) case tikvrpc.TiDB: @@ -784,7 +865,7 @@ func (s *RegionRequestSender) getRPCContext( } func (s *RegionRequestSender) reset() { - s.leaderReplicaSelector = nil + s.replicaSelector = nil s.failStoreIDs = nil s.failProxyStoreIDs = nil } @@ -869,9 +950,6 @@ func (s *RegionRequestSender) SendReqCtx( if err != nil { return nil, nil, err } - if rpcCtx != nil { - rpcCtx.tryTimes = tryTimes - } if _, err := util.EvalFailpoint("invalidCacheAndRetry"); err == nil { // cooperate with tikvclient/setGcResolveMaxBackoff @@ -922,7 +1000,7 @@ func (s *RegionRequestSender) SendReqCtx( return nil, nil, errors.Trace(err) } if regionErr != nil { - retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, &opts) + retry, err = s.onRegionError(bo, rpcCtx, req, regionErr) if err != nil { return nil, nil, errors.Trace(err) } @@ -931,8 +1009,8 @@ func (s *RegionRequestSender) SendReqCtx( continue } } else { - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.onSendSuccess() + if s.replicaSelector != nil { + s.replicaSelector.onSendSuccess() } } return resp, rpcCtx, nil @@ -1171,8 +1249,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e } if ctx.Meta != nil { - if s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.onSendFailure(bo, err) + if s.replicaSelector != nil { + s.replicaSelector.onSendFailure(bo, err) } else { s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err) } @@ -1237,19 +1315,12 @@ func regionErrorToLabel(e *errorpb.Error) string { return "unknown" } -func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, opts *[]StoreSelectorOption) (shouldRetry bool, err error) { +func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) { if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil { span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context())) defer span1.Finish() bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1)) } - // Stale Read request will retry the leader or next peer on error, - // if txnScope is global, we will only retry the leader by using the WithLeaderOnly option, - // if txnScope is local, we will retry both other peers and the leader by the incresing seed. - if ctx.tryTimes < 1 && req != nil && req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() { - *opts = append(*opts, WithLeaderOnly()) - } - seed := req.GetReplicaReadSeed() // NOTE: Please add the region error handler in the same order of errorpb.Error. metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc() @@ -1260,8 +1331,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.String("notLeader", notLeader.String()), zap.String("ctx", ctx.String())) - if s.leaderReplicaSelector != nil { - return s.leaderReplicaSelector.onNotLeader(bo, ctx, notLeader) + if s.replicaSelector != nil { + return s.replicaSelector.onNotLeader(bo, ctx, notLeader) } else if notLeader.GetLeader() == nil { // The peer doesn't know who is the current leader. Generally it's because // the Raft group is in an election, but it's possible that the peer is @@ -1289,11 +1360,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // This peer is removed from the region. Invalidate the region since it's too stale. if regionErr.GetRegionNotFound() != nil { - if seed != nil { - logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader", - zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed)) - *seed = *seed + 1 - } s.regionCache.InvalidateCachedRegion(ctx.Region) return false, nil } @@ -1308,12 +1374,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later", zap.Stringer("EpochNotMatch", epochNotMatch), zap.Stringer("ctx", ctx)) - if seed != nil { - *seed = *seed + 1 - } retry, err := s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions) - if !retry && s.leaderReplicaSelector != nil { - s.leaderReplicaSelector.invalidateRegion() + if !retry && s.replicaSelector != nil { + s.replicaSelector.invalidateRegion() } return retry, errors.Trace(err) } @@ -1338,9 +1401,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext // but we don't handle it now. if regionErr.GetStaleCommand() != nil { logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx)) - if s.leaderReplicaSelector != nil { + if s.replicaSelector != nil { // Needn't backoff because the new leader should be elected soon - // and the leaderReplicaSelector will try the next peer. + // and the replicaSelector will try the next peer. } else { err = bo.Backoff(retry.BoStaleCmd, errors.Errorf("stale command, ctx: %v", ctx)) if err != nil { @@ -1384,9 +1447,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if seed != nil { - *seed = *seed + 1 - } return true, nil } @@ -1396,9 +1456,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.Uint64("store-id", ctx.Store.storeID), zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()), zap.Stringer("ctx", ctx)) - if seed != nil { - *seed = *seed + 1 - } // The region can't provide service until split or merge finished, so backoff. err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("read index not ready, ctx: %v", ctx)) if err != nil { @@ -1431,9 +1488,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext if err != nil { return false, errors.Trace(err) } - if seed != nil { - *seed = *seed + 1 - } return true, nil } @@ -1441,7 +1495,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext zap.Stringer("regionErr", regionErr), zap.Stringer("ctx", ctx)) - if s.leaderReplicaSelector != nil { + if s.replicaSelector != nil { // Try the next replica. return true, nil } diff --git a/internal/locate/region_request3_test.go b/internal/locate/region_request3_test.go index 7b1f4678..51dc22e0 100644 --- a/internal/locate/region_request3_test.go +++ b/internal/locate/region_request3_test.go @@ -50,6 +50,7 @@ import ( "github.com/tikv/client-go/v2/internal/mockstore/mocktikv" "github.com/tikv/client-go/v2/internal/retry" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikvrpc" ) @@ -86,35 +87,6 @@ func (s *testRegionRequestToThreeStoresSuite) TearDownTest() { s.mvccStore.Close() } -func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContext() { - // Load the bootstrapped region into the cache. - _, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte{}, 1) - s.Nil(err) - - var seed uint32 - var regionID = RegionVerID{s.regionID, 0, 0} - - req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadLeader, &seed) - rpcCtx, err := s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV) - s.Nil(err) - s.Equal(rpcCtx.Peer.Id, s.leaderPeer) - - req.ReplicaReadType = kv.ReplicaReadFollower - rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV) - s.Nil(err) - s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) - - req.ReplicaReadType = kv.ReplicaReadMixed - rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV) - s.Nil(err) - s.Equal(rpcCtx.Peer.Id, s.leaderPeer) - - seed = 1 - rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV) - s.Nil(err) - s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer) -} - func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() { req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{}) region, err := s.cache.LocateRegionByID(s.bo, s.regionID) @@ -306,6 +278,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.NotNil(regionLoc) region := s.cache.GetCachedRegionWithRLock(regionLoc.Region) regionStore := region.getStore() + req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{}) // Create a fake region and change its leader to the last peer. regionStore = regionStore.clone() @@ -331,7 +304,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { cache.insertRegionToCache(region) // Verify creating the replicaSelector. - replicaSelector, err := newReplicaSelector(cache, regionLoc.Region) + replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req) s.NotNil(replicaSelector) s.Nil(err) s.Equal(replicaSelector.region, region) @@ -396,7 +369,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Test switching to tryFollower if leader is unreachable region.lastAccess = time.Now().Unix() - replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { @@ -417,7 +390,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Test switching to tryNewProxy if leader is unreachable and forwarding is enabled refreshEpochs(regionStore) cache.enableForwarding = true - replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState { @@ -461,7 +434,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { // Test initial state is accessByKnownProxy when proxyTiKVIdx is valid refreshEpochs(regionStore) cache.enableForwarding = true - replicaSelector, err = newReplicaSelector(cache, regionLoc.Region) + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) s.Nil(err) s.NotNil(replicaSelector) state2, ok := replicaSelector.state.(*accessByKnownProxy) @@ -483,6 +456,103 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() { s.Equal(replicaSelector.targetReplica().attempts, 2) s.Equal(replicaSelector.proxyReplica().attempts, 1) + // Test accessFollower state with kv.ReplicaReadFollower request type. + req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil) + refreshEpochs(regionStore) + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) + s.Nil(err) + s.NotNil(replicaSelector) + state3, ok := replicaSelector.state.(*accessFollower) + s.True(ok) + s.False(state3.tryLeader) + s.Equal(regionStore.workTiKVIdx, state3.leaderIdx) + s.Equal(state3.lastIdx, AccessIndex(-1)) + + lastIdx := AccessIndex(-1) + for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ { + rpcCtx, err := replicaSelector.next(s.bo) + s.Nil(err) + // Should swith to the next follower. + s.NotEqual(lastIdx, state3.lastIdx) + // Shouldn't access the leader if followers aren't exhausted. + s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx) + s.Equal(replicaSelector.targetIdx, state3.lastIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil) + lastIdx = state3.lastIdx + } + // Fallback to the leader for 1 time + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + s.Equal(regionStore.workTiKVIdx, state3.lastIdx) + s.Equal(replicaSelector.targetIdx, state3.lastIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + // All replicas are exhausted. + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(rpcCtx) + s.Nil(err) + + // Test accessFollower state filtering epoch-stale stores. + region.lastAccess = time.Now().Unix() + refreshEpochs(regionStore) + // Mark all followers as stale. + tiKVNum := regionStore.accessStoreNum(tiKVOnly) + for i := 1; i < tiKVNum; i++ { + regionStore.storeEpochs[(regionStore.workTiKVIdx+AccessIndex(i))%AccessIndex(tiKVNum)]++ + } + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) + s.NotNil(replicaSelector) + s.Nil(err) + state3 = replicaSelector.state.(*accessFollower) + // Should fallback to the leader immediately. + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + s.Equal(regionStore.workTiKVIdx, state3.lastIdx) + s.Equal(replicaSelector.targetIdx, state3.lastIdx) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + + // Test accessFollower state filtering label-not-match stores. + region.lastAccess = time.Now().Unix() + refreshEpochs(regionStore) + labels := []*metapb.StoreLabel{ + { + Key: "a", + Value: "b", + }, + } + regionStore.workTiKVIdx = AccessIndex(0) + accessIdx := AccessIndex(regionStore.accessStoreNum(tiKVOnly) - 1) + _, store := regionStore.accessStore(tiKVOnly, accessIdx) + store.labels = labels + for i := 0; i < 5; i++ { + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithMatchLabels(labels)) + s.NotNil(replicaSelector) + s.Nil(err) + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[accessIdx], nil) + } + + // Test accessFollower state with leaderOnly option + region.lastAccess = time.Now().Unix() + refreshEpochs(regionStore) + for i := 0; i < 5; i++ { + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly()) + s.NotNil(replicaSelector) + s.Nil(err) + rpcCtx, err = replicaSelector.next(s.bo) + s.Nil(err) + // Should always access the leader. + assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil) + } + + // Test accessFollower state with kv.ReplicaReadMixed request type. + region.lastAccess = time.Now().Unix() + refreshEpochs(regionStore) + req.ReplicaReadType = kv.ReplicaReadMixed + replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req) + s.NotNil(replicaSelector) + s.Nil(err) + // Invalidate the region if the leader is not in the region. region.lastAccess = time.Now().Unix() replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()}) @@ -505,7 +575,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.NotNil(region) reloadRegion := func() { - s.regionRequestSender.leaderReplicaSelector.region.invalidate(Other) + s.regionRequestSender.replicaSelector.region.invalidate(Other) region, _ = s.cache.LocateRegionByID(s.bo, s.regionID) } @@ -535,7 +605,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) - s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1)) + s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) s.True(bo.GetTotalBackoffTimes() == 1) s.cluster.StartStore(s.storeIDs[0]) @@ -544,7 +614,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.NotNil(resp) - s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1)) + s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1)) s.True(bo.GetTotalBackoffTimes() == 0) // Switch to the next peer due to leader failure but the new leader is not elected. @@ -574,7 +644,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Nil(err) s.True(hasFakeRegionError(resp)) s.Equal(bo.GetTotalBackoffTimes(), 3) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.cluster.ChangeLeader(s.regionID, s.peerIDs[0]) // The leader store is alive but can't provide service. @@ -588,7 +658,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err = sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) s.cluster.StartStore(s.storeIDs[0]) @@ -622,7 +692,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2) }() } @@ -642,7 +712,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -661,7 +731,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { resp, err := sender.SendReq(bo, req, region.Region, time.Second) s.Nil(err) s.True(hasFakeRegionError(resp)) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() @@ -694,7 +764,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { regionErr, _ := resp.GetRegionError() s.NotNil(regionErr) } - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) s.Equal(bo.GetTotalBackoffTimes(), 0) }() } @@ -712,8 +782,40 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() { s.Nil(err) s.True(hasFakeRegionError(resp)) s.True(bo.GetTotalBackoffTimes() == 3) - s.False(sender.leaderReplicaSelector.region.isValid()) + s.False(sender.replicaSelector.region.isValid()) for _, store := range s.storeIDs { s.cluster.StartStore(store) } + + // Verify switch to the leader immediately when stale read requests with global txn scope meet region errors. + s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0]) + reloadRegion() + req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")}) + req.TxnScope = oracle.GlobalTxnScope + req.EnableStaleRead() + for i := 0; i < 5; i++ { + // The request may be sent to the leader directly. We have to distinguish it. + failureOnFollower := false + s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) { + if addr != s.cluster.GetStore(s.storeIDs[0]).Address { + failureOnFollower = true + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil + } + return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil + }} + sender.SendReq(bo, req, region.Region, time.Second) + state, ok := sender.replicaSelector.state.(*accessFollower) + s.True(ok) + s.True(!failureOnFollower || state.option.leaderOnly) + totalAttempts := 0 + for idx, replica := range sender.replicaSelector.replicas { + totalAttempts += replica.attempts + if idx == int(state.leaderIdx) { + s.Equal(1, replica.attempts) + } else { + s.True(replica.attempts <= 1) + } + } + s.True(totalAttempts <= 2) + } } diff --git a/tikvrpc/tikvrpc.go b/tikvrpc/tikvrpc.go index 8c2b14dd..0b6f9980 100644 --- a/tikvrpc/tikvrpc.go +++ b/tikvrpc/tikvrpc.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/kvproto/pkg/mpp" "github.com/pingcap/kvproto/pkg/tikvpb" "github.com/tikv/client-go/v2/kv" + "github.com/tikv/client-go/v2/oracle" ) // CmdType represents the concrete request type in Request or response type in Response. @@ -256,6 +257,11 @@ func (req *Request) EnableStaleRead() { req.ReplicaRead = false } +// IsGlobalStaleRead checks if the request is a global stale read request. +func (req *Request) IsGlobalStaleRead() bool { + return req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() +} + // IsDebugReq check whether the req is debug req. func (req *Request) IsDebugReq() bool { switch req.Type {