select replica for replica read requests using ReplicaSelector (#285)

Signed-off-by: youjiali1995 <zlwgx1023@gmail.com>
This commit is contained in:
Lei Zhao 2021-09-02 10:10:36 +08:00 committed by GitHub
parent 1798c19099
commit 06c3868595
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 307 additions and 253 deletions

View File

@ -468,8 +468,6 @@ type RPCContext struct {
ProxyStore *Store // nil means proxy is not used
ProxyAddr string // valid when ProxyStore is not nil
TiKVNum int // Number of TiKV nodes among the region's peers. Assuming non-TiKV peers are all TiFlash peers.
tryTimes int
}
func (c *RPCContext) String() string {

View File

@ -44,14 +44,11 @@ import (
"github.com/google/btree"
"github.com/pingcap/kvproto/pkg/errorpb"
"github.com/pingcap/kvproto/pkg/kvrpcpb"
"github.com/pingcap/kvproto/pkg/metapb"
"github.com/stretchr/testify/suite"
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
pd "github.com/tikv/pd/client"
)
@ -1016,7 +1013,7 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() {
r := ctxTiFlash.Meta
reqSend := NewRegionRequestSender(s.cache, nil)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{CurrentRegions: []*metapb.Region{r}}}
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr, nil)
reqSend.onRegionError(s.bo, ctxTiFlash, nil, regionErr)
// check leader read should not go to tiflash
lctx, err = s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadLeader, 0)
@ -1024,51 +1021,6 @@ func (s *testRegionCacheSuite) TestRegionEpochOnTiFlash() {
s.NotEqual(lctx.Peer.Id, s.peer1)
}
func (s *testRegionCacheSuite) TestRegionDataNotReady() {
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc1.Region.id, s.region1)
testcases := []struct {
scope string
readType kv.ReplicaReadType
expectPeerID uint64
expectOptsLen int
expectSeed uint32
}{
{
scope: oracle.GlobalTxnScope,
readType: kv.ReplicaReadFollower,
expectPeerID: s.peer2,
expectOptsLen: 1,
expectSeed: 1,
},
{
scope: "local",
readType: kv.ReplicaReadFollower,
expectPeerID: s.peer2,
expectOptsLen: 0,
expectSeed: 1,
},
}
for _, testcase := range testcases {
fctx, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, testcase.readType, 0)
s.Nil(err)
s.Equal(fctx.Peer.Id, testcase.expectPeerID)
reqSend := NewRegionRequestSender(s.cache, nil)
regionErr := &errorpb.Error{DataIsNotReady: &errorpb.DataIsNotReady{}}
var opts []StoreSelectorOption
seed := uint32(0)
s.bo.Reset()
req := &tikvrpc.Request{TxnScope: testcase.scope, Context: kvrpcpb.Context{StaleRead: true}, ReplicaReadSeed: &seed}
retry, err := reqSend.onRegionError(s.bo, fctx, req, regionErr, &opts)
s.Nil(err)
s.True(retry)
s.Equal(len(opts), testcase.expectOptsLen)
s.Equal(*req.GetReplicaReadSeed(), testcase.expectSeed)
}
}
const regionSplitKeyFormat = "t%08d"
func createClusterWithStoresAndRegions(regionCnt, storeCount int) *mocktikv.Cluster {
@ -1284,64 +1236,6 @@ func (s *testRegionCacheSuite) TestMixedReadFallback() {
s.Equal(ctx.Peer.Id, s.peer2)
}
func (s *testRegionCacheSuite) TestFollowerMeetEpochNotMatch() {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)
// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc1.Region.id, s.region1)
reqSend := NewRegionRequestSender(s.cache, nil)
// follower read failed on store2
followReqSeed := uint32(0)
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadFollower, followReqSeed)
s.Nil(err)
s.Equal(ctxFollower1.Peer.Id, s.peer2)
s.Equal(ctxFollower1.Store.storeID, s.store2)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
s.Equal(followReqSeed, uint32(1))
regionErr = &errorpb.Error{RegionNotFound: &errorpb.RegionNotFound{}}
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
s.Equal(followReqSeed, uint32(2))
}
func (s *testRegionCacheSuite) TestMixedMeetEpochNotMatch() {
// 3 nodes and no.1 is region1 leader.
store3 := s.cluster.AllocID()
peer3 := s.cluster.AllocID()
s.cluster.AddStore(store3, s.storeAddr(store3))
s.cluster.AddPeer(s.region1, store3, peer3)
s.cluster.ChangeLeader(s.region1, s.peer1)
// Check the two regions.
loc1, err := s.cache.LocateKey(s.bo, []byte("a"))
s.Nil(err)
s.Equal(loc1.Region.id, s.region1)
reqSend := NewRegionRequestSender(s.cache, nil)
// follower read failed on store1
followReqSeed := uint32(0)
ctxFollower1, err := s.cache.GetTiKVRPCContext(s.bo, loc1.Region, kv.ReplicaReadMixed, followReqSeed)
s.Nil(err)
s.Equal(ctxFollower1.Peer.Id, s.peer1)
s.Equal(ctxFollower1.Store.storeID, s.store1)
regionErr := &errorpb.Error{EpochNotMatch: &errorpb.EpochNotMatch{}}
reqSend.onRegionError(s.bo, ctxFollower1, &tikvrpc.Request{ReplicaReadSeed: &followReqSeed}, regionErr, nil)
s.Equal(followReqSeed, uint32(1))
}
func (s *testRegionCacheSuite) TestPeersLenChange() {
// 2 peers [peer1, peer2] and let peer2 become leader
loc, err := s.cache.LocateKey(s.bo, []byte("a"))

View File

@ -60,7 +60,6 @@ import (
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/util"
)
@ -99,13 +98,13 @@ func LoadShuttingDown() uint32 {
// For other region errors, since region range have changed, the request may need to
// split, so we simply return the error to caller.
type RegionRequestSender struct {
regionCache *RegionCache
client client.Client
storeAddr string
rpcError error
leaderReplicaSelector *replicaSelector
failStoreIDs map[uint64]struct{}
failProxyStoreIDs map[uint64]struct{}
regionCache *RegionCache
client client.Client
storeAddr string
rpcError error
replicaSelector *replicaSelector
failStoreIDs map[uint64]struct{}
failProxyStoreIDs map[uint64]struct{}
RegionRequestRuntimeStats
}
@ -240,14 +239,20 @@ type replica struct {
attempts int
}
func (r *replica) isEpochStale() bool {
return r.epoch != atomic.LoadUint32(&r.store.epoch)
}
func (r *replica) isExhausted(maxAttempt int) bool {
return r.attempts >= maxAttempt
}
type replicaSelector struct {
regionCache *RegionCache
region *Region
regionStore *regionStore
// replicas contains all TiKV replicas for now and the leader is at the
// head of the slice.
replicas []*replica
state selectorState
replicas []*replica
state selectorState
// replicas[targetIdx] is the replica handling the request this time
targetIdx AccessIndex
// replicas[proxyIdx] is the store used to redirect requests this time
@ -317,7 +322,7 @@ type accessKnownLeader struct {
func (state *accessKnownLeader) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
leader := selector.replicas[state.leaderIdx]
if leader.attempts >= maxReplicaAttempt {
if leader.isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
return nil, stateChanged{}
}
@ -331,7 +336,7 @@ func (state *accessKnownLeader) onSendFailure(bo *retry.Backoffer, selector *rep
selector.state = &accessByKnownProxy{leaderIdx: state.leaderIdx}
return
}
if liveness != reachable || selector.targetReplica().attempts >= maxReplicaAttempt {
if liveness != reachable || selector.targetReplica().isExhausted(maxReplicaAttempt) {
selector.state = &tryFollower{leaderIdx: state.leaderIdx, lastIdx: state.leaderIdx}
}
if liveness != reachable {
@ -366,7 +371,7 @@ func (state *tryFollower) next(bo *retry.Backoffer, selector *replicaSelector) (
}
targetReplica = selector.replicas[idx]
// Each follower is only tried once
if targetReplica.attempts == 0 {
if !targetReplica.isExhausted(1) {
state.lastIdx = idx
selector.targetIdx = idx
break
@ -429,8 +434,7 @@ func (state *accessByKnownProxy) onNoLeader(selector *replicaSelector) {
selector.state = &invalidLeader{}
}
// tryNewProxy is the state where we try to find a node from followers
// as proxy.
// tryNewProxy is the state where we try to find a node from followers as proxy.
type tryNewProxy struct {
stateBase
leaderIdx AccessIndex
@ -477,7 +481,7 @@ func (state *tryNewProxy) next(bo *retry.Backoffer, selector *replicaSelector) (
func (state *tryNewProxy) isCandidate(idx AccessIndex, replica *replica) bool {
// Try each peer only once
return idx != state.leaderIdx && replica.attempts == 0
return idx != state.leaderIdx && !replica.isExhausted(1)
}
func (state *tryNewProxy) onSendSuccess(selector *replicaSelector) {
@ -494,6 +498,79 @@ func (state *tryNewProxy) onNoLeader(selector *replicaSelector) {
selector.state = &invalidLeader{}
}
// accessFollower is the state where we are sending requests to TiKV followers.
// If there is no suitable follower, requests will be sent to the leader as a fallback.
type accessFollower struct {
stateBase
// If tryLeader is true, the request can also be sent to the leader.
tryLeader bool
isGlobalStaleRead bool
option storeSelectorOp
leaderIdx AccessIndex
lastIdx AccessIndex
}
func (state *accessFollower) next(bo *retry.Backoffer, selector *replicaSelector) (*RPCContext, error) {
if state.lastIdx < 0 {
if state.tryLeader {
state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas)))
} else {
if len(selector.replicas) <= 1 {
state.lastIdx = state.leaderIdx
} else {
// Randomly select a non-leader peer
state.lastIdx = AccessIndex(rand.Intn(len(selector.replicas) - 1))
if state.lastIdx >= state.leaderIdx {
state.lastIdx++
}
}
}
} else {
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the strategy of replicaSelector.
if state.isGlobalStaleRead {
WithLeaderOnly()(&state.option)
}
state.lastIdx++
}
for i := 0; i < len(selector.replicas) && !state.option.leaderOnly; i++ {
idx := AccessIndex((int(state.lastIdx) + i) % len(selector.replicas))
if state.isCandidate(idx, selector.replicas[idx]) {
state.lastIdx = idx
selector.targetIdx = idx
break
}
}
// If there is no candidate, fallback to the leader.
if selector.targetIdx < 0 {
leader := selector.replicas[state.leaderIdx]
if leader.isEpochStale() || leader.isExhausted(1) {
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("exhausted").Inc()
selector.invalidateRegion()
return nil, nil
}
state.lastIdx = state.leaderIdx
selector.targetIdx = state.leaderIdx
}
return selector.buildRPCContext(bo)
}
func (state *accessFollower) onSendFailure(bo *retry.Backoffer, selector *replicaSelector, cause error) {
if selector.checkLiveness(bo, selector.targetReplica()) != reachable {
selector.invalidateReplicaStore(selector.targetReplica(), cause)
}
}
func (state *accessFollower) isCandidate(idx AccessIndex, replica *replica) bool {
return !replica.isEpochStale() && !replica.isExhausted(1) &&
// The request can only be sent to the leader.
((state.option.leaderOnly && idx == state.leaderIdx) ||
// Choose a replica with matched labels.
(!state.option.leaderOnly && (state.tryLeader || idx != state.leaderIdx) && replica.store.IsLabelsMatch(state.option.labels)))
}
type invalidStore struct {
stateBase
}
@ -514,7 +591,9 @@ func (state *invalidLeader) next(_ *retry.Backoffer, _ *replicaSelector) (*RPCCo
return nil, nil
}
func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replicaSelector, error) {
// newReplicaSelector creates a replicaSelector which selects replicas according to reqType and opts.
// opts is currently only effective for follower read.
func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID, req *tikvrpc.Request, opts ...StoreSelectorOption) (*replicaSelector, error) {
cachedRegion := regionCache.GetCachedRegionWithRLock(regionID)
if cachedRegion == nil || !cachedRegion.isValid() {
return nil, nil
@ -530,11 +609,26 @@ func newReplicaSelector(regionCache *RegionCache, regionID RegionVerID) (*replic
})
}
var state selectorState
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx}
if !req.ReplicaReadType.IsFollowerRead() {
if regionCache.enableForwarding && regionStore.proxyTiKVIdx >= 0 {
state = &accessByKnownProxy{leaderIdx: regionStore.workTiKVIdx}
} else {
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
}
} else {
state = &accessKnownLeader{leaderIdx: regionStore.workTiKVIdx}
option := storeSelectorOp{}
for _, op := range opts {
op(&option)
}
state = &accessFollower{
tryLeader: req.ReplicaReadType == kv.ReplicaReadMixed,
isGlobalStaleRead: req.IsGlobalStaleRead(),
option: option,
leaderIdx: regionStore.workTiKVIdx,
lastIdx: -1,
}
}
return &replicaSelector{
regionCache,
cachedRegion,
@ -598,15 +692,21 @@ func (s *replicaSelector) refreshRegionStore() {
return
}
// If leader has changed, it means a recent request succeeds an RPC on the new
// leader. Give the leader an addition chance.
// If leader has changed, it means a recent request succeeds an RPC
// on the new leader.
if oldRegionStore.workTiKVIdx != newRegionStore.workTiKVIdx {
newLeaderIdx := newRegionStore.workTiKVIdx
s.state = &accessKnownLeader{leaderIdx: newLeaderIdx}
if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt {
s.replicas[newLeaderIdx].attempts--
switch state := s.state.(type) {
case *accessFollower:
state.leaderIdx = newRegionStore.workTiKVIdx
default:
// Try the new leader and give it an addition chance if the
// request is sent to the leader.
newLeaderIdx := newRegionStore.workTiKVIdx
s.state = &accessKnownLeader{leaderIdx: newLeaderIdx}
if s.replicas[newLeaderIdx].attempts == maxReplicaAttempt {
s.replicas[newLeaderIdx].attempts--
}
}
return
}
}
@ -614,8 +714,11 @@ func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, err
targetReplica, proxyReplica := s.targetReplica(), s.proxyReplica()
// Backoff and retry if no replica is selected or the selected replica is stale
if targetReplica == nil || s.isReplicaStoreEpochStale(targetReplica) ||
(proxyReplica != nil && s.isReplicaStoreEpochStale(proxyReplica)) {
if targetReplica == nil || targetReplica.isEpochStale() ||
(proxyReplica != nil && proxyReplica.isEpochStale()) {
// TODO(youjiali1995): Is it necessary to invalidate the region?
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc()
s.invalidateRegion()
return nil, nil
}
@ -656,17 +759,6 @@ func (s *replicaSelector) buildRPCContext(bo *retry.Backoffer) (*RPCContext, err
return rpcCtx, nil
}
func (s *replicaSelector) isReplicaStoreEpochStale(replica *replica) bool {
storeFailEpoch := atomic.LoadUint32(&replica.store.epoch)
if storeFailEpoch != replica.epoch {
// TODO(youjiali1995): Is it necessary to invalidate the region?
metrics.TiKVReplicaSelectorFailureCounter.WithLabelValues("stale_store").Inc()
s.invalidateRegion()
return true
}
return false
}
func (s *replicaSelector) onSendFailure(bo *retry.Backoffer, err error) {
metrics.RegionCacheCounterWithSendFail.Inc()
s.state.onSendFailure(bo, s, err)
@ -720,7 +812,7 @@ func (s *replicaSelector) updateLeader(leader *metapb.Peer) {
}
for i, replica := range s.replicas {
if isSamePeer(replica.peer, leader) {
if replica.attempts == maxReplicaAttempt {
if replica.isExhausted(maxReplicaAttempt) {
// Give the replica one more chance and because each follower is tried only once,
// it won't result in infinite retry.
replica.attempts = maxReplicaAttempt - 1
@ -755,25 +847,14 @@ func (s *RegionRequestSender) getRPCContext(
) (*RPCContext, error) {
switch et {
case tikvrpc.TiKV:
// Now only requests sent to the replica leader will use the replica selector to get
// the RPC context.
// TODO(youjiali1995): make all requests use the replica selector.
if req.ReplicaReadType == kv.ReplicaReadLeader {
if s.leaderReplicaSelector == nil {
selector, err := newReplicaSelector(s.regionCache, regionID)
if selector == nil || err != nil {
return nil, err
}
s.leaderReplicaSelector = selector
if s.replicaSelector == nil {
selector, err := newReplicaSelector(s.regionCache, regionID, req, opts...)
if selector == nil || err != nil {
return nil, err
}
return s.leaderReplicaSelector.next(bo)
s.replicaSelector = selector
}
var seed uint32
if req.ReplicaReadSeed != nil {
seed = *req.ReplicaReadSeed
}
return s.regionCache.GetTiKVRPCContext(bo, regionID, req.ReplicaReadType, seed, opts...)
return s.replicaSelector.next(bo)
case tikvrpc.TiFlash:
return s.regionCache.GetTiFlashRPCContext(bo, regionID, true)
case tikvrpc.TiDB:
@ -784,7 +865,7 @@ func (s *RegionRequestSender) getRPCContext(
}
func (s *RegionRequestSender) reset() {
s.leaderReplicaSelector = nil
s.replicaSelector = nil
s.failStoreIDs = nil
s.failProxyStoreIDs = nil
}
@ -869,9 +950,6 @@ func (s *RegionRequestSender) SendReqCtx(
if err != nil {
return nil, nil, err
}
if rpcCtx != nil {
rpcCtx.tryTimes = tryTimes
}
if _, err := util.EvalFailpoint("invalidCacheAndRetry"); err == nil {
// cooperate with tikvclient/setGcResolveMaxBackoff
@ -922,7 +1000,7 @@ func (s *RegionRequestSender) SendReqCtx(
return nil, nil, errors.Trace(err)
}
if regionErr != nil {
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr, &opts)
retry, err = s.onRegionError(bo, rpcCtx, req, regionErr)
if err != nil {
return nil, nil, errors.Trace(err)
}
@ -931,8 +1009,8 @@ func (s *RegionRequestSender) SendReqCtx(
continue
}
} else {
if s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.onSendSuccess()
if s.replicaSelector != nil {
s.replicaSelector.onSendSuccess()
}
}
return resp, rpcCtx, nil
@ -1171,8 +1249,8 @@ func (s *RegionRequestSender) onSendFail(bo *retry.Backoffer, ctx *RPCContext, e
}
if ctx.Meta != nil {
if s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.onSendFailure(bo, err)
if s.replicaSelector != nil {
s.replicaSelector.onSendFailure(bo, err)
} else {
s.regionCache.OnSendFail(bo, ctx, s.NeedReloadRegion(ctx), err)
}
@ -1237,19 +1315,12 @@ func regionErrorToLabel(e *errorpb.Error) string {
return "unknown"
}
func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error, opts *[]StoreSelectorOption) (shouldRetry bool, err error) {
func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext, req *tikvrpc.Request, regionErr *errorpb.Error) (shouldRetry bool, err error) {
if span := opentracing.SpanFromContext(bo.GetCtx()); span != nil && span.Tracer() != nil {
span1 := span.Tracer().StartSpan("tikv.onRegionError", opentracing.ChildOf(span.Context()))
defer span1.Finish()
bo.SetCtx(opentracing.ContextWithSpan(bo.GetCtx(), span1))
}
// Stale Read request will retry the leader or next peer on error,
// if txnScope is global, we will only retry the leader by using the WithLeaderOnly option,
// if txnScope is local, we will retry both other peers and the leader by the incresing seed.
if ctx.tryTimes < 1 && req != nil && req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead() {
*opts = append(*opts, WithLeaderOnly())
}
seed := req.GetReplicaReadSeed()
// NOTE: Please add the region error handler in the same order of errorpb.Error.
metrics.TiKVRegionErrorCounter.WithLabelValues(regionErrorToLabel(regionErr)).Inc()
@ -1260,8 +1331,8 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.String("notLeader", notLeader.String()),
zap.String("ctx", ctx.String()))
if s.leaderReplicaSelector != nil {
return s.leaderReplicaSelector.onNotLeader(bo, ctx, notLeader)
if s.replicaSelector != nil {
return s.replicaSelector.onNotLeader(bo, ctx, notLeader)
} else if notLeader.GetLeader() == nil {
// The peer doesn't know who is the current leader. Generally it's because
// the Raft group is in an election, but it's possible that the peer is
@ -1289,11 +1360,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
// This peer is removed from the region. Invalidate the region since it's too stale.
if regionErr.GetRegionNotFound() != nil {
if seed != nil {
logutil.BgLogger().Debug("tikv reports `RegionNotFound` in follow-reader",
zap.Stringer("ctx", ctx), zap.Uint32("seed", *seed))
*seed = *seed + 1
}
s.regionCache.InvalidateCachedRegion(ctx.Region)
return false, nil
}
@ -1308,12 +1374,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
logutil.BgLogger().Debug("tikv reports `EpochNotMatch` retry later",
zap.Stringer("EpochNotMatch", epochNotMatch),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
retry, err := s.regionCache.OnRegionEpochNotMatch(bo, ctx, epochNotMatch.CurrentRegions)
if !retry && s.leaderReplicaSelector != nil {
s.leaderReplicaSelector.invalidateRegion()
if !retry && s.replicaSelector != nil {
s.replicaSelector.invalidateRegion()
}
return retry, errors.Trace(err)
}
@ -1338,9 +1401,9 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
// but we don't handle it now.
if regionErr.GetStaleCommand() != nil {
logutil.BgLogger().Debug("tikv reports `StaleCommand`", zap.Stringer("ctx", ctx))
if s.leaderReplicaSelector != nil {
if s.replicaSelector != nil {
// Needn't backoff because the new leader should be elected soon
// and the leaderReplicaSelector will try the next peer.
// and the replicaSelector will try the next peer.
} else {
err = bo.Backoff(retry.BoStaleCmd, errors.Errorf("stale command, ctx: %v", ctx))
if err != nil {
@ -1384,9 +1447,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
if err != nil {
return false, errors.Trace(err)
}
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
@ -1396,9 +1456,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.Uint64("store-id", ctx.Store.storeID),
zap.Uint64("region-id", regionErr.GetRegionNotInitialized().GetRegionId()),
zap.Stringer("ctx", ctx))
if seed != nil {
*seed = *seed + 1
}
// The region can't provide service until split or merge finished, so backoff.
err = bo.Backoff(retry.BoRegionScheduling, errors.Errorf("read index not ready, ctx: %v", ctx))
if err != nil {
@ -1431,9 +1488,6 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
if err != nil {
return false, errors.Trace(err)
}
if seed != nil {
*seed = *seed + 1
}
return true, nil
}
@ -1441,7 +1495,7 @@ func (s *RegionRequestSender) onRegionError(bo *retry.Backoffer, ctx *RPCContext
zap.Stringer("regionErr", regionErr),
zap.Stringer("ctx", ctx))
if s.leaderReplicaSelector != nil {
if s.replicaSelector != nil {
// Try the next replica.
return true, nil
}

View File

@ -50,6 +50,7 @@ import (
"github.com/tikv/client-go/v2/internal/mockstore/mocktikv"
"github.com/tikv/client-go/v2/internal/retry"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
"github.com/tikv/client-go/v2/tikvrpc"
)
@ -86,35 +87,6 @@ func (s *testRegionRequestToThreeStoresSuite) TearDownTest() {
s.mvccStore.Close()
}
func (s *testRegionRequestToThreeStoresSuite) TestGetRPCContext() {
// Load the bootstrapped region into the cache.
_, err := s.cache.BatchLoadRegionsFromKey(s.bo, []byte{}, 1)
s.Nil(err)
var seed uint32
var regionID = RegionVerID{s.regionID, 0, 0}
req := tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadLeader, &seed)
rpcCtx, err := s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
req.ReplicaReadType = kv.ReplicaReadFollower
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
req.ReplicaReadType = kv.ReplicaReadMixed
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
s.Nil(err)
s.Equal(rpcCtx.Peer.Id, s.leaderPeer)
seed = 1
rpcCtx, err = s.regionRequestSender.getRPCContext(s.bo, req, regionID, tikvrpc.TiKV)
s.Nil(err)
s.NotEqual(rpcCtx.Peer.Id, s.leaderPeer)
}
func (s *testRegionRequestToThreeStoresSuite) TestStoreTokenLimit() {
req := tikvrpc.NewRequest(tikvrpc.CmdPrewrite, &kvrpcpb.PrewriteRequest{}, kvrpcpb.Context{})
region, err := s.cache.LocateRegionByID(s.bo, s.regionID)
@ -306,6 +278,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.NotNil(regionLoc)
region := s.cache.GetCachedRegionWithRLock(regionLoc.Region)
regionStore := region.getStore()
req := tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kvrpcpb.Context{})
// Create a fake region and change its leader to the last peer.
regionStore = regionStore.clone()
@ -331,7 +304,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
cache.insertRegionToCache(region)
// Verify creating the replicaSelector.
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region)
replicaSelector, err := newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
s.Equal(replicaSelector.region, region)
@ -396,7 +369,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Test switching to tryFollower if leader is unreachable
region.lastAccess = time.Now().Unix()
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
@ -417,7 +390,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Test switching to tryNewProxy if leader is unreachable and forwarding is enabled
refreshEpochs(regionStore)
cache.enableForwarding = true
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
cache.testingKnobs.mockRequestLiveness = func(s *Store, bo *retry.Backoffer) livenessState {
@ -461,7 +434,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
// Test initial state is accessByKnownProxy when proxyTiKVIdx is valid
refreshEpochs(regionStore)
cache.enableForwarding = true
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
state2, ok := replicaSelector.state.(*accessByKnownProxy)
@ -483,6 +456,103 @@ func (s *testRegionRequestToThreeStoresSuite) TestReplicaSelector() {
s.Equal(replicaSelector.targetReplica().attempts, 2)
s.Equal(replicaSelector.proxyReplica().attempts, 1)
// Test accessFollower state with kv.ReplicaReadFollower request type.
req = tikvrpc.NewReplicaReadRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{}, kv.ReplicaReadFollower, nil)
refreshEpochs(regionStore)
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.Nil(err)
s.NotNil(replicaSelector)
state3, ok := replicaSelector.state.(*accessFollower)
s.True(ok)
s.False(state3.tryLeader)
s.Equal(regionStore.workTiKVIdx, state3.leaderIdx)
s.Equal(state3.lastIdx, AccessIndex(-1))
lastIdx := AccessIndex(-1)
for i := 0; i < regionStore.accessStoreNum(tiKVOnly)-1; i++ {
rpcCtx, err := replicaSelector.next(s.bo)
s.Nil(err)
// Should swith to the next follower.
s.NotEqual(lastIdx, state3.lastIdx)
// Shouldn't access the leader if followers aren't exhausted.
s.NotEqual(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[replicaSelector.targetIdx], nil)
lastIdx = state3.lastIdx
}
// Fallback to the leader for 1 time
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// All replicas are exhausted.
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(rpcCtx)
s.Nil(err)
// Test accessFollower state filtering epoch-stale stores.
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
// Mark all followers as stale.
tiKVNum := regionStore.accessStoreNum(tiKVOnly)
for i := 1; i < tiKVNum; i++ {
regionStore.storeEpochs[(regionStore.workTiKVIdx+AccessIndex(i))%AccessIndex(tiKVNum)]++
}
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
state3 = replicaSelector.state.(*accessFollower)
// Should fallback to the leader immediately.
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
s.Equal(regionStore.workTiKVIdx, state3.lastIdx)
s.Equal(replicaSelector.targetIdx, state3.lastIdx)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
// Test accessFollower state filtering label-not-match stores.
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
labels := []*metapb.StoreLabel{
{
Key: "a",
Value: "b",
},
}
regionStore.workTiKVIdx = AccessIndex(0)
accessIdx := AccessIndex(regionStore.accessStoreNum(tiKVOnly) - 1)
_, store := regionStore.accessStore(tiKVOnly, accessIdx)
store.labels = labels
for i := 0; i < 5; i++ {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithMatchLabels(labels))
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[accessIdx], nil)
}
// Test accessFollower state with leaderOnly option
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
for i := 0; i < 5; i++ {
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req, WithLeaderOnly())
s.NotNil(replicaSelector)
s.Nil(err)
rpcCtx, err = replicaSelector.next(s.bo)
s.Nil(err)
// Should always access the leader.
assertRPCCtxEqual(rpcCtx, replicaSelector.replicas[regionStore.workTiKVIdx], nil)
}
// Test accessFollower state with kv.ReplicaReadMixed request type.
region.lastAccess = time.Now().Unix()
refreshEpochs(regionStore)
req.ReplicaReadType = kv.ReplicaReadMixed
replicaSelector, err = newReplicaSelector(cache, regionLoc.Region, req)
s.NotNil(replicaSelector)
s.Nil(err)
// Invalidate the region if the leader is not in the region.
region.lastAccess = time.Now().Unix()
replicaSelector.updateLeader(&metapb.Peer{Id: s.cluster.AllocID(), StoreId: s.cluster.AllocID()})
@ -505,7 +575,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.NotNil(region)
reloadRegion := func() {
s.regionRequestSender.leaderReplicaSelector.region.invalidate(Other)
s.regionRequestSender.replicaSelector.region.invalidate(Other)
region, _ = s.cache.LocateRegionByID(s.bo, s.regionID)
}
@ -535,7 +605,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1))
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
s.True(bo.GetTotalBackoffTimes() == 1)
s.cluster.StartStore(s.storeIDs[0])
@ -544,7 +614,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.NotNil(resp)
s.Equal(sender.leaderReplicaSelector.targetIdx, AccessIndex(1))
s.Equal(sender.replicaSelector.targetIdx, AccessIndex(1))
s.True(bo.GetTotalBackoffTimes() == 0)
// Switch to the next peer due to leader failure but the new leader is not elected.
@ -574,7 +644,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.Equal(bo.GetTotalBackoffTimes(), 3)
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.cluster.ChangeLeader(s.regionID, s.peerIDs[0])
// The leader store is alive but can't provide service.
@ -588,7 +658,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err = sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
s.cluster.StartStore(s.storeIDs[0])
@ -622,7 +692,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), maxReplicaAttempt+2)
}()
}
@ -642,7 +712,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
@ -661,7 +731,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
resp, err := sender.SendReq(bo, req, region.Region, time.Second)
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
@ -694,7 +764,7 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
regionErr, _ := resp.GetRegionError()
s.NotNil(regionErr)
}
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
s.Equal(bo.GetTotalBackoffTimes(), 0)
}()
}
@ -712,8 +782,40 @@ func (s *testRegionRequestToThreeStoresSuite) TestSendReqWithReplicaSelector() {
s.Nil(err)
s.True(hasFakeRegionError(resp))
s.True(bo.GetTotalBackoffTimes() == 3)
s.False(sender.leaderReplicaSelector.region.isValid())
s.False(sender.replicaSelector.region.isValid())
for _, store := range s.storeIDs {
s.cluster.StartStore(store)
}
// Verify switch to the leader immediately when stale read requests with global txn scope meet region errors.
s.cluster.ChangeLeader(region.Region.id, s.peerIDs[0])
reloadRegion()
req = tikvrpc.NewRequest(tikvrpc.CmdGet, &kvrpcpb.GetRequest{Key: []byte("key")})
req.TxnScope = oracle.GlobalTxnScope
req.EnableStaleRead()
for i := 0; i < 5; i++ {
// The request may be sent to the leader directly. We have to distinguish it.
failureOnFollower := false
s.regionRequestSender.client = &fnClient{func(ctx context.Context, addr string, req *tikvrpc.Request, timeout time.Duration) (response *tikvrpc.Response, err error) {
if addr != s.cluster.GetStore(s.storeIDs[0]).Address {
failureOnFollower = true
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{RegionError: &errorpb.Error{}}}, nil
}
return &tikvrpc.Response{Resp: &kvrpcpb.GetResponse{}}, nil
}}
sender.SendReq(bo, req, region.Region, time.Second)
state, ok := sender.replicaSelector.state.(*accessFollower)
s.True(ok)
s.True(!failureOnFollower || state.option.leaderOnly)
totalAttempts := 0
for idx, replica := range sender.replicaSelector.replicas {
totalAttempts += replica.attempts
if idx == int(state.leaderIdx) {
s.Equal(1, replica.attempts)
} else {
s.True(replica.attempts <= 1)
}
}
s.True(totalAttempts <= 2)
}
}

View File

@ -49,6 +49,7 @@ import (
"github.com/pingcap/kvproto/pkg/mpp"
"github.com/pingcap/kvproto/pkg/tikvpb"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/oracle"
)
// CmdType represents the concrete request type in Request or response type in Response.
@ -256,6 +257,11 @@ func (req *Request) EnableStaleRead() {
req.ReplicaRead = false
}
// IsGlobalStaleRead checks if the request is a global stale read request.
func (req *Request) IsGlobalStaleRead() bool {
return req.TxnScope == oracle.GlobalTxnScope && req.GetStaleRead()
}
// IsDebugReq check whether the req is debug req.
func (req *Request) IsDebugReq() bool {
switch req.Type {